solidified base setup V2

This commit is contained in:
gac-x07mb
2024-07-25 14:35:28 +02:00
parent 1c2722e384
commit 54435e799d
10 changed files with 135 additions and 966 deletions

View File

@ -1 +1,2 @@
from .phoenix import PhoenixBL_from_iphyhon_plugin
#to load plugins in this directory upon bec startup, make import in
# bec_iphyton_client/startup.py

View File

@ -1,88 +0,0 @@
#from unittest import mock
import numpy as np
#import pandas
#import pytest
#from bec_lib import messages
#import device_server
#from ophyd import Component as Cpt
from ophyd import Device, EpicsMotor, EpicsSignal, EpicsSignalRO
#from ophyd import FormattedComponent as FCpt
#from ophyd import Kind, PVPositioner, Signal
#from ophyd.flyers import FlyerInterface
#from ophyd.pv_positioner import PVPositionerComparator
#from ophyd.status import DeviceStatus, SubscriptionStatus
from bec_lib.logger import bec_logger
logger = bec_logger.logger
import time as tt
#import ophyd
import os
import sys
#logger = bec_logger.logger
# load simulation
#bec.config.load_demo_config()
# .. define base path for directory with scripts
#
ophyd_devices_path='/data/test/x07mb-test-bec/bec_deployment/ophyd_devices'
p_path='/data/test/x07mb-test-bec/bec_deployment/bec_server_venv/lib/python3.11/site-packages'
if ophyd_devices_path not in sys.path:
sys.path.insert(1, os.path.expandvars(ophyd_devices_path))
#endif
if p_path not in sys.path:
sys.path.insert(1, os.path.expandvars(p_path))
class PhoenixBL_from_iphyhon_plugin():
"""
General class for PHOENIX beamline in ipython_plugins
"""
#define some epics channels
#scan_name = "phoenix_base"
def __init__(self):
"""
init PhoenixBL() in ConfigPHOENIX.config.phoenix
"""
import os
#from ophyd import Device, EpicsMotor, EpicsSignal, EpicsSignalRO
#from ophyd import Component as Cpt
#self.ScanX = EpicsMotor(name='ScanX',prefix='X07MB-ES-MA1:ScanX')
#self.ScanY = EpicsMotor(name='ScanY',prefix='X07MB-ES-MA1:ScanY')
#self.DIODE = EpicsSignal(name='SI',read_pv='X07MB-OP2-SAI_07:MEAN')
#self.SIG = Cpt(EpicsSignal,name='we',read_pv="X07MB-OP2-SAI_07:MEAN")
#self.SMPL = EpicsSignal(name='SMPL',read_pv='X07MB-OP2:SMPL')
#self.CYCLES = EpicsSignal(name='SMPL',read_pv='X07MB-OP2:TOTAL-CYCLES',write_pv='X07MB-OP2:TOTAL-CYCLES')
# load local configuration
print('init PhoenixBL')
self.path_scripts_local = '/data/test/x07mb-test-bec/bec_deployment/phoenix_bec/MyScripts/'
self.path_config_local = self.path_scripts_local + 'ConfigPHOENIX/' # base dir for local configurations
self.path_devices_local = self.path_config_local + 'device_config/' # local yamal file
self.file_device_conf = self.path_devices_local + 'phoenix_devices.yaml'
#bec.config.update_session_with_file(self.file_device_conf)
# last command created yaml backup, for now just move it away
#os.system('mv *.yaml '+Devices_local+'/recovery_configs')
#os.system('mv *.yaml tmp')
def read_def_config():
bec.config.update_session_with_file(self.file_device_conf)
def print_setup(self):
"""
docstring print_setup
"""
print(self.path_scripts_local)

View File

@ -34,10 +34,70 @@ to setup the prompts.
"""
# pylint: disable=invalid-name, unused-import, import-error, undefined-variable, unused-variable, unused-argument, no-name-in-module
import sys
# SETUP PROMPTS
bec._ip.prompts.username = "PHOENIX"
bec._ip.prompts.status = 1
from phoenix_bec.bec_ipython_client.plugins.phoenix import PhoenixBL_from_iphyhon_plugin
from phoenix_bec.devices.falcon_csaxs import FalconHDF5Plugins
# make sure that edited modules are reloaded when changed
print('post_startup.py : set autoreload of modules')
bec._ip.run_line_magic("load_ext","autoreload")
bec._ip.run_line_magic("autoreload", "2")
###########
#
#
# next lines are not ideal and likely need to be removed they are a temporaty fix as we had trouble finding certain paths.
#
# we need to work in server_env, otherwise no server is found
# path to ophyd devices is only way to find default ophyd devices
# path to python packages not ideal but only way to add pyp5 and pandas packages
#
##########
print('post_startup.py : set some paths as temp fix. this needs to be solved ')
ophyd_devices_path='/data/test/x07mb-test-bec/bec_deployment/ophyd_devices'
p_path='/data/test/x07mb-test-bec/bec_deployment/bec_server_venv/lib/python3.11/site-packages'
print('add',ophyd_devices_path)
print('add',p_path)
if ophyd_devices_path not in sys.path:
sys.path.insert(1, os.path.expandvars(ophyd_devices_path))
#endif
if p_path not in sys.path:
sys.path.insert(1, os.path.expandvars(p_path))
#endif
print('init phoenix=PhoenixBL()')
phoenix=PhoenixBL()
#... provide info about currently uses paths and directories
phoenix.show_phoenix_setup()
"""
print('')
print('Current setup for bec --- to be professionanlized ')
print('use ')
print('/phoenix_bec/phoenix_bec/bec_ipython_client/startup/post_startup.py')
print('..................... for commands to start/init bec iphython shell')
print('......................here we init phoenix=PhoenixBL()')
print('')
print('/bec_deployment/phoenix_bec/phoenix_bec/scripts ')
print(' --- autoloaded scripts directory ')
print(' ... file PhoenixBL in phoenix.py defines BL core functions ')
print('')
print('/bec_deployment/phoenix_bec/phoenix_bec/devices')
print(' .... yamal files for device ')
print('/bec_deployment/phoenix_bec/phoenix_bec/local_scripts')
print(' .... collection of local scripts for testing purposes')
print('re run this file by ')
print('phoenix_bec/bec_ipython_client/startup/post_startup.py')
"""
#from phoenix_bec.bec_ipython_client.plugins.phoenix import Phoenix
#from phoenix_bec.devices.falcon_phoenix_no_hdf5 import FalconHDF5Plugins

View File

@ -1,36 +1,61 @@
falcon:
description: Falcon detector x-ray fluoresence
deviceClass: csaxs_bec.devices.epics.falcon_csaxs.FalconcSAXS
#falcon_nohdf5:
# description: Falcon detector x-ray fluoresence II
# deviceClass: phoenix_bec.devices.falcon_phoenix_no_hdf5.FalconcSAXS
# deviceConfig:
# prefix: 'X07MB-SITORO:'
# deviceTags:
# - phoenix
# - falcon
# - no hdf5
# - phoenix_devices.yaml
# onFailure: buffer
# enabled: true
# readoutPriority: async
# softwareTrigger: false
xmap_nohdf5:
description: Falcon detector x-ray fluoresence II
deviceClass: phoenix_bec.devices.xmap_phoenix_no_hdf5.XMAPphoenix
deviceConfig:
prefix: 'X07MB-SITORO:'
prefix: 'X07MB-XMAP:'
deviceTags:
- cSAXS
- falcon
- phoenix
- xmap
- no hdf5
- phoenix_devices.yaml
onFailure: buffer
enabled: true
readoutPriority: async
softwareTrigger: false
ScanYY: #
#
# MOTORS ES1
#
ScanXX:
ScanX:
readoutPriority: baseline
description: 'Horizontal sample position'
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X07MB-ES-MA1:ScanX'
prefix: 'X07MB-ES-MA1:ScanX'
deviceTags:
- ES-MA1
- phoenix_devices.yaml
onFailure: retry
enabled: true
readOnly: false
softwareTrigger: false
ScanYY:
ScanY:
readoutPriority: baseline
description: 'Horizontal sample position'
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X07MB-ES-MA1:ScanY'
prefix: 'X07MB-ES-MA1:ScanY'
deviceTags:
- ES-MA1
- phoenix_devices.yaml
onFailure: retry
enabled: true
readOnly: false
@ -46,10 +71,11 @@ SAI_07_MEAN:
description: DIODE
deviceClass: ophyd.EpicsSignalRO
deviceConfig:
auto_monitor: true
read_pv: 'X07MB-OP2-SAI_07:MEAN'
auto_monitor: true
read_pv: 'X07MB-OP2-SAI_07:MEAN'
deviceTags:
- PHOENIX
- PHOENIX
- phoenix_devices.yaml
onFailure: buffer
enabled: true
readOnly: true
@ -63,7 +89,8 @@ SAI_08_MEAN:
auto_monitor: true
read_pv: 'X07MB-OP2-SAI_08:MEAN'
deviceTags:
- PHOENIX
- PHOENIX
- phoenix_devices.yaml
onFailure: buffer
enabled: true
readOnly: true

View File

@ -1 +1 @@
from .falcon_csaxs import FalconcSAXS
from .falcon_phoenix_no_hdf5 import FalconcSAXS

View File

@ -1,363 +0,0 @@
#
#
# changes version for PHOENIX WITHOUT HDF5 plugin to be revised/renamed...
#
#
import enum
import os
import threading
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd import Device, EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV
from ophyd.mca import EpicsMCARecord
from ophyd_devices.interfaces.base_classes.psi_detector_base import (
CustomDetectorMixin,
PSIDetectorBase,
)
logger = bec_logger.logger
class FalconError(Exception):
"""Base class for exceptions in this module."""
class FalconTimeoutError(FalconError):
"""Raised when the Falcon does not respond in time."""
class DetectorState(enum.IntEnum):
"""Detector states for Falcon detector"""
DONE = 0
ACQUIRING = 1
class TriggerSource(enum.IntEnum):
"""Trigger source for Falcon detector"""
USER = 0
GATE = 1
SYNC = 2
class MappingSource(enum.IntEnum):
"""Mapping source for Falcon detector"""
SPECTRUM = 0
MAPPING = 1
class EpicsDXPFalcon(Device):
"""
DXP parameters for Falcon detector
Base class to map EPICS PVs from DXP parameters to ophyd signals.
"""
elapsed_live_time = Cpt(EpicsSignal, "ElapsedLiveTime")
elapsed_real_time = Cpt(EpicsSignal, "ElapsedRealTime")
elapsed_trigger_live_time = Cpt(EpicsSignal, "ElapsedTriggerLiveTime")
# Energy Filter PVs
energy_threshold = Cpt(EpicsSignalWithRBV, "DetectionThreshold")
min_pulse_separation = Cpt(EpicsSignalWithRBV, "MinPulsePairSeparation")
detection_filter = Cpt(EpicsSignalWithRBV, "DetectionFilter", string=True)
scale_factor = Cpt(EpicsSignalWithRBV, "ScaleFactor")
risetime_optimisation = Cpt(EpicsSignalWithRBV, "RisetimeOptimization")
# Misc PVs
detector_polarity = Cpt(EpicsSignalWithRBV, "DetectorPolarity")
decay_time = Cpt(EpicsSignalWithRBV, "DecayTime")
current_pixel = Cpt(EpicsSignalRO, "CurrentPixel")
class FalconHDF5Plugins(Device):
"""
HDF5 parameters for Falcon detector
Base class to map EPICS PVs from HDF5 Plugin to ophyd signals.
"""
""" ----------------------------------------------------------------------------
capture = Cpt(EpicsSignalWithRBV, "Capture")
enable = Cpt(EpicsSignalWithRBV, "EnableCallbacks", string=True, kind="config")
xml_file_name = Cpt(EpicsSignalWithRBV, "XMLFileName", string=True, kind="config")
lazy_open = Cpt(EpicsSignalWithRBV, "LazyOpen", string=True, doc="0='No' 1='Yes'")
temp_suffix = Cpt(EpicsSignalWithRBV, "TempSuffix", string=True)
file_path = Cpt(EpicsSignalWithRBV, "FilePath", string=True, kind="config")
file_name = Cpt(EpicsSignalWithRBV, "FileName", string=True, kind="config")
file_template = Cpt(EpicsSignalWithRBV, "FileTemplate", string=True, kind="config")
num_capture = Cpt(EpicsSignalWithRBV, "NumCapture", kind="config")
file_write_mode = Cpt(EpicsSignalWithRBV, "FileWriteMode", kind="config")
queue_size = Cpt(EpicsSignalWithRBV, "QueueSize", kind="config")
array_counter = Cpt(EpicsSignalWithRBV, "ArrayCounter", kind="config")
"""
class FalconSetup(CustomDetectorMixin):
"""
Falcon setup class for cSAXS
Parent class: CustomDetectorMixin
"""
def __init__(self, *args, parent: Device = None, **kwargs) -> None:
super().__init__(*args, parent=parent, **kwargs)
self._lock = threading.RLock()
def on_init(self) -> None:
"""Initialize Falcon detector"""
self.initialize_default_parameter()
self.initialize_detector()
self.initialize_detector_backend()
def initialize_default_parameter(self) -> None:
"""
Set default parameters for Falcon
This will set:
- readout (float): readout time in seconds
- value_pixel_per_buffer (int): number of spectra in buffer of Falcon Sitoro
"""
self.parent.value_pixel_per_buffer = 20
self.update_readout_time()
def update_readout_time(self) -> None:
"""Set readout time for Eiger9M detector"""
readout_time = (
self.parent.scaninfo.readout_time
if hasattr(self.parent.scaninfo, "readout_time")
else self.parent.MIN_READOUT
)
self.parent.readout_time = max(readout_time, self.parent.MIN_READOUT)
def initialize_detector(self) -> None:
"""Initialize Falcon detector"""
self.stop_detector()
self.stop_detector_backend()
self.set_trigger(
mapping_mode=MappingSource.MAPPING, trigger_source=TriggerSource.GATE, ignore_gate=0
)
# 1 Realtime
self.parent.preset_mode.put(1)
# 0 Normal, 1 Inverted
self.parent.input_logic_polarity.put(0)
# 0 Manual 1 Auto
self.parent.auto_pixels_per_buffer.put(0)
# Sets the number of pixels/spectra in the buffer
self.parent.pixels_per_buffer.put(self.parent.value_pixel_per_buffer)
def initialize_detector_backend(self) -> None:
"""Initialize the detector backend for Falcon."""
w=0
#----------------------------------------------------------------------
#self.parent.hdf5.enable.put(1)
# file location of h5 layout for cSAXS
#self.parent.hdf5.xml_file_name.put("layout.xml")
# TODO Check if lazy open is needed and wanted!
#self.parent.hdf5.lazy_open.put(1)
#self.parent.hdf5.temp_suffix.put("")
# size of queue for number of spectra allowed in the buffer, if too small at high throughput, data is lost
#self.parent.hdf5.queue_size.put(2000)
# Segmentation into Spectra within EPICS, 1 is activate, 0 is deactivate
#self.parent.nd_array_mode.put(1)
def on_stage(self) -> None:
"""Prepare detector and backend for acquisition"""
self.prepare_detector()
self.prepare_data_backend()
self.publish_file_location(done=False, successful=False)
self.arm_acquisition()
def prepare_detector(self) -> None:
"""Prepare detector for acquisition"""
self.set_trigger(
mapping_mode=MappingSource.MAPPING, trigger_source=TriggerSource.GATE, ignore_gate=0
)
self.parent.preset_real.put(self.parent.scaninfo.exp_time)
self.parent.pixels_per_run.put(
int(self.parent.scaninfo.num_points * self.parent.scaninfo.frames_per_trigger)
)
def prepare_data_backend(self) -> None:
"""Prepare data backend for acquisition"""
w=9
""" --------------------------------------------------------------
self.parent.filepath.set(
self.parent.filewriter.compile_full_filename(f"{self.parent.name}.h5")
).wait()
file_path, file_name = os.path.split(self.parent.filepath.get())
self.parent.hdf5.file_path.put(file_path)
self.parent.hdf5.file_name.put(file_name)
self.parent.hdf5.file_template.put("%s%s")
self.parent.hdf5.num_capture.put(
int(self.parent.scaninfo.num_points * self.parent.scaninfo.frames_per_trigger)
)
self.parent.hdf5.file_write_mode.put(2)
# Reset spectrum counter in filewriter, used for indexing & identifying missing triggers
self.parent.hdf5.array_counter.put(0)
# Start file writing
self.parent.hdf5.capture.put(1)
"""
def arm_acquisition(self) -> None:
"""Arm detector for acquisition"""
self.parent.start_all.put(1)
signal_conditions = [
(
lambda: self.parent.state.read()[self.parent.state.name]["value"],
DetectorState.ACQUIRING,
)
]
if not self.wait_for_signals(
signal_conditions=signal_conditions,
timeout=self.parent.TIMEOUT_FOR_SIGNALS,
check_stopped=True,
all_signals=False,
):
raise FalconTimeoutError(
f"Failed to arm the acquisition. Detector state {signal_conditions[0][0]}"
)
def on_unstage(self) -> None:
"""Unstage detector and backend"""
pass
def on_complete(self) -> None:
"""Complete detector and backend"""
#------------------------------------------------------------------
#self.finished(timeout=self.parent.TIMEOUT_FOR_SIGNALS)
#self.publish_file_location(done=True, successful=True)
w=9
def on_stop(self) -> None:
"""Stop detector and backend"""
self.stop_detector()
#self.stop_detector_backend()
def stop_detector(self) -> None:
"""Stops detector"""
self.parent.stop_all.put(1)
self.parent.erase_all.put(1)
#-------------------------------------------------------------------
#signal_conditions = [
# (lambda: self.parent.state.read()[self.parent.state.name]["value"], DetectorState.DONE)
#]
#if not self.wait_for_signals(
# signal_conditions=signal_conditions,
# timeout=self.parent.TIMEOUT_FOR_SIGNALS - self.parent.TIMEOUT_FOR_SIGNALS // 2,
# all_signals=False,
#):
# # Retry stop detector and wait for remaining time
# raise FalconTimeoutError(
# f"Failed to stop detector, timeout with state {signal_conditions[0][0]}"
# )
def stop_detector_backend(self) -> None:
"""Stop the detector backend"""
#self.parent.hdf5.capture.put(0)
w=0
def finished(self, timeout: int = 5) -> None:
"""Check if scan finished succesfully"""
with self._lock:
total_frames = int(
self.parent.scaninfo.num_points * self.parent.scaninfo.frames_per_trigger
)
signal_conditions = [
(self.parent.dxp.current_pixel.get, total_frames),
# (self.parent.hdf5.array_counter.get, total_frames), ---------------------
]
if not self.wait_for_signals(
signal_conditions=signal_conditions,
timeout=timeout,
check_stopped=True,
all_signals=True,
):
logger.debug(
f"Falcon missed a trigger: received trigger {self.parent.dxp.current_pixel.get()},"
f" send data {self.parent.hdf5.array_counter.get()} from total_frames"
f" {total_frames}"
)
self.stop_detector()
self.stop_detector_backend()
def set_trigger(
self, mapping_mode: MappingSource, trigger_source: TriggerSource, ignore_gate: int = 0
) -> None:
"""
Set triggering mode for detector
Args:
mapping_mode (MappingSource): Mapping mode for the detector
trigger_source (TriggerSource): Trigger source for the detector, pixel_advance_signal
ignore_gate (int): Ignore gate from TTL signal; defaults to 0
"""
mapping = int(mapping_mode)
trigger = trigger_source
self.parent.collect_mode.put(mapping)
self.parent.pixel_advance_mode.put(trigger)
self.parent.ignore_gate.put(ignore_gate)
class FalconcSAXS(PSIDetectorBase):
"""
Falcon Sitoro detector for CSAXS
Parent class: PSIDetectorBase
class attributes:
custom_prepare_cls (FalconSetup) : Custom detector setup class for cSAXS,
inherits from CustomDetectorMixin
PSIDetectorBase.set_min_readout (float) : Minimum readout time for the detector
dxp (EpicsDXPFalcon) : DXP parameters for Falcon detector
mca (EpicsMCARecord) : MCA parameters for Falcon detector
hdf5 (FalconHDF5Plugins) : HDF5 parameters for Falcon detector
MIN_READOUT (float) : Minimum readout time for the detector
"""
# Specify which functions are revealed to the user in BEC client
USER_ACCESS = ["describe"]
# specify Setup class
custom_prepare_cls = FalconSetup
# specify minimum readout time for detector
MIN_READOUT = 3e-3
TIMEOUT_FOR_SIGNALS = 5
# specify class attributes
dxp = Cpt(EpicsDXPFalcon, "dxp1:")
mca = Cpt(EpicsMCARecord, "mca1")
hdf5 = Cpt(FalconHDF5Plugins, "HDF1:")
stop_all = Cpt(EpicsSignal, "StopAll")
erase_all = Cpt(EpicsSignal, "EraseAll")
start_all = Cpt(EpicsSignal, "StartAll")
state = Cpt(EpicsSignal, "Acquiring")
preset_mode = Cpt(EpicsSignal, "PresetMode") # 0 No preset 1 Real time 2 Events 3 Triggers
preset_real = Cpt(EpicsSignal, "PresetReal")
preset_events = Cpt(EpicsSignal, "PresetEvents")
preset_triggers = Cpt(EpicsSignal, "PresetTriggers")
triggers = Cpt(EpicsSignalRO, "MaxTriggers", lazy=True)
events = Cpt(EpicsSignalRO, "MaxEvents", lazy=True)
input_count_rate = Cpt(EpicsSignalRO, "MaxInputCountRate", lazy=True)
output_count_rate = Cpt(EpicsSignalRO, "MaxOutputCountRate", lazy=True)
collect_mode = Cpt(EpicsSignal, "CollectMode") # 0 MCA spectra, 1 MCA mapping
pixel_advance_mode = Cpt(EpicsSignal, "PixelAdvanceMode")
ignore_gate = Cpt(EpicsSignal, "IgnoreGate")
input_logic_polarity = Cpt(EpicsSignal, "InputLogicPolarity")
auto_pixels_per_buffer = Cpt(EpicsSignal, "AutoPixelsPerBuffer")
pixels_per_buffer = Cpt(EpicsSignal, "PixelsPerBuffer")
pixels_per_run = Cpt(EpicsSignal, "PixelsPerRun")
nd_array_mode = Cpt(EpicsSignal, "NDArrayMode")
if __name__ == "__main__":
falcon = FalconcSAXS(name="falcon", prefix="X12SA-SITORO:", sim_mode=True)

View File

@ -1,57 +0,0 @@
#
# MOTORS ES1
#
ScanXX:
readoutPriority: baseline
description: 'Horizontal sample position'
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X07MB-ES-MA1:ScanX'
onFailure: retry
enabled: true
readOnly: false
softwareTrigger: false
ScanYY:
readoutPriority: baseline
description: 'Horizontal sample position'
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X07MB-ES-MA1:ScanY'
onFailure: retry
enabled: true
readOnly: false
softwareTrigger: false
#
#
# DIODES from ES1 ADC
#
#
SAI_07_MEAN:
readoutPriority: monitored
description: DIODE
deviceClass: ophyd.EpicsSignalRO
deviceConfig:
auto_monitor: true
read_pv: 'X07MB-OP2-SAI_07:MEAN'
deviceTags:
- PHOENIX
onFailure: buffer
enabled: true
readOnly: true
softwareTrigger: false
SAI_08_MEAN:
readoutPriority: monitored
description: DIODE
deviceClass: ophyd.EpicsSignalRO
deviceConfig:
auto_monitor: true
read_pv: 'X07MB-OP2-SAI_08:MEAN'
deviceTags:
- PHOENIX
onFailure: buffer
enabled: true
readOnly: true
softwareTrigger: false

View File

@ -14,6 +14,7 @@ Description of current setup local_scripts/Documentation/Current_Setup.txt
/bec_deployment/phoenix_bec/phoenix_bec/scripts
.. autoloaded scripts directory
.. for solidified scritps
.. file PhoenixBL in phoenix.py defines BL core functions
/bec_deployment/phoenix_bec/phoenix_bec/devices
@ -21,6 +22,7 @@ Description of current setup local_scripts/Documentation/Current_Setup.txt
/bec_deployment/phoenix_bec/phoenix_bec/local_scripts
.. collection of local scripts for testing purposes
.. all local configurations start name with LOCAL to minimize confusion
to run startup file:

View File

@ -1,419 +0,0 @@
This file contains a selection of base classes to remember where to find them and how they look
#########################################################################################
#
#
# PART I device classes psi_detector_base.py
# CONTAINS
# DetectorInitErroR(Exception)
# CustomDetectorMixing
# PSIDetectorBase(Device)
#
#
#########################################################################################
psi_detector_base.py
https://gitlab.psi.ch/bec/ophyd_devices/-/blob/main/ophyd_devices/interfaces/base_classes/psi_detector_base.py
""This module contains the base class for SLS detectors. We follow the approach to integrate
PSI detectors into the BEC system based on this base class. The base class is used to implement
certain methods that are expected by BEC, such as stage, unstage, trigger, stop, etc...
We use composition with a custom prepare class to implement BL specific logic for the detector.
The beamlines need to inherit from the CustomDetectorMixing for their mixin classes."""
file psi_detector_base.py
import os
class DetectorInitError(Exception):
"""Raised when initiation of the device class fails,
due to missing device manager or not started in sim_mode."""
class CustomDetectorMixin:
"""
Mixin class for custom detector logic
This class is used to implement BL specific logic for the detector.
It is used in the PSIDetectorBase class.
For the integration of a new detector, the following functions should
help with integrating functionality, but additional ones can be added.
Check PSIDetectorBase for the functions that are called during relevant function calls of
stage, unstage, trigger, stop and _init.
"""
def __init__(self, *_args, parent: Device = None, **_kwargs) -> None:
self.parent = parent
def on_init(self) -> None:
"""
Init sequence for the detector
"""
_base.py
ready has all current parameters for the upcoming scan.
In case the backend service is writing data on disk, this step should include publishing
a file_event and file_message to BEC to inform the system where the data is written to.
IMPORTANT:
It must be safe to assume that the device is ready for the scan
to start immediately once this function is finished.
"""
def on_unstage(self) -> None:
"""
Specify actions to be executed during unstage.
This step should include checking if the acqusition was successful,
and publishing the file location and file event message,
with flagged done to BEC.
"""
def on_stop(self) -> None:
"""
Specify actions to be executed during stop.
This must also set self.parent.stopped to True.
This step should include stopping the detector and backend service.
"""
def on_trigger(self) -> None | DeviceStatus:
"""
Specify actions to be executed upon receiving trigger signal.
Return a DeviceStatus object or None
"""
def on_pre_scan(self) -> None:
"""
Specify actions to be executed right before a scan starts.
Only use if needed, and it is recommended to keep this function as short/fast as possible.
"""
def on_complete(self) -> None | DeviceStatus:
"""
Specify actions to be executed when the scan is complete.
This can for instance be to check with the detector and backend if all data is written succsessfully.
"""
def publish_file_location(self, done: bool, successful: bool, metadata: dict = None) -> None:
"""
Publish the filepath to REDIS.
We publish two events here:
- file_event: event for the filewriter
- public_file: event for any secondary service (e.g. radial integ code)
Args:
done (bool): True if scan is finished
successful (bool): True if scan was successful
metadata (dict): additional metadata to publish
"""
if metadata is None:
metadata = {}
msg = messages.FileMessage(
file_path=self.parent.filepath.get(),
done=done,
successful=successful,
metadata=metadata,
)
pipe = self.parent.connector.pipeline()
self.parent.connector.set_and_publish(
MessageEndpoints.public_file(self.parent.scaninfo.scan_id, self.parent.name),
msg,
pipe=pipe,
)
self.parent.connector.set_and_publish(
MessageEndpoints.file_event(self.parent.name), msg, pipe=pipe
)
pipe.execute()
def wait_for_signals(
self,
signal_conditions: list[tuple],
timeout: float,
check_stopped: bool = False,
interval: float = 0.05,
all_signals: bool = False,
) -> bool:
"""
Convenience wrapper to allow waiting for signals to reach a certain condition.
For EPICs PVs, an example usage is pasted at the bottom.
Args:
signal_conditions (list[tuple]): tuple of executable calls for conditions (get_current_state, condition) to check
timeout (float): timeout in seconds
interval (float): interval in seconds
all_signals (bool): True if all signals should be True, False if any signal should be True
Returns:
bool: True if all signals are in the desired state, False if timeout is reached
>>> Example usage for EPICS PVs:
>>> self.wait_for_signals(signal_conditions=[(self.acquiring.get, False)], timeout=5, interval=0.05, check_stopped=True, all_signals=True)
"""
timer = 0
while True:
checks = [
get_current_state() == condition
for get_current_state, condition in signal_conditions
]
if check_stopped is True and self.parent.stopped is True:
return False
if (all_signals and all(checks)) or (not all_signals and any(checks)):
return True
if timer > timeout:
return False
time.sleep(interval)
timer += interval
def wait_with_status(
self,
signal_conditions: list[tuple],
timeout: float,
check_stopped: bool = False,
interval: float = 0.05,
all_signals: bool = False,
exception_on_timeout: Exception = TimeoutError("Timeout while waiting for signals"),
) -> DeviceStatus:
"""Utility function to wait for signals in a thread.
Returns a DevicesStatus object that resolves either to set_finished or set_exception.
The DeviceStatus is attached to the parent device, i.e. the detector object inheriting from PSIDetectorBase.
Usage:
This function should be used to wait for signals to reach a certain condition, especially in the context of
on_trigger and on_complete. If it is not used, functions may block and slow down the performance of BEC.
It will return a DeviceStatus object that is to be returned from the function. Once the conditions are met,
the DeviceStatus will be set to set_finished in case of success or set_exception in case of a timeout or exception.
The exception can be specified with the exception_on_timeout argument. The default exception is a TimeoutError.
Args:
signal_conditions (list[tuple]): tuple of executable calls for conditions (get_current_state, condition) to check
timeout (float): timeout in seconds
check_stopped (bool): True if stopped flag should be checked
interval (float): interval in seconds
all_signals (bool): True if all signals should be True, False if any signal should be True
exception_on_timeout (Exception): Exception to raise on timeout
Returns:
DeviceStatus: DeviceStatus object that resolves either to set_finished or set_exception
"""
status = DeviceStatus(self.parent)
# utility function to wrap the wait_for_signals function
def wait_for_signals_wrapper(
status: DeviceStatus,
signal_conditions: list[tuple],
timeout: float,
check_stopped: bool,
interval: float,
all_signals: bool,
exception_on_timeout: Exception = TimeoutError("Timeout while waiting for signals"),
):
"""Convenient wrapper around wait_for_signals to set status based on the result.
Args:
status (DeviceStatus): DeviceStatus object to be set
signal_conditions (list[tuple]): tuple of executable calls for conditions (get_current_state, condition) to check
timeout (float): timeout in seconds
check_stopped (bool): True if stopped flag should be checked
interval (float): interval in seconds
all_signals (bool): True if all signals should be True, False if any signal should be True
exception_on_timeout (Exception): Exception to raise on timeout
"""
try:
result = self.wait_for_signals(
signal_conditions, timeout, check_stopped, interval, all_signals
)
if result:
status.set_finished()
else:
status.set_exception(exception_on_timeout)
except Exception as exc:
status.set_exception(exc=exc)
thread = threading.Thread(
target=wait_for_signals_wrapper,
args=(
status,
signal_conditions,
timeout,
check_stopped,
interval,
all_signals,
exception_on_timeout,
),
daemon=True,
)
thread.start()
return status
class PSIDetectorBase(Device):
"""
Abstract base class for SLS detectors
Class attributes:
custom_prepare_cls (object): class for custom prepare logic (BL specific)
Args:
prefix (str): EPICS PV prefix for component (optional)
name (str): name of the device, as will be reported via read()
kind (str): member of class 'ophydobj.Kind', defaults to Kind.normal
omitted -> readout ignored for read 'ophydobj.read()'
normal -> readout for read
config -> config parameter for 'ophydobj.read_configuration()'
hinted -> which attribute is readout for read
parent (object): instance of the parent device
device_manager (object): bec device manager
**kwargs: keyword arguments
"""
filepath = Component(SetableSignal, value="", kind=Kind.config)
custom_prepare_cls = CustomDetectorMixin
def __init__(self, prefix="", *, name, kind=None, parent=None, device_manager=None, **kwargs):
super().__init__(prefix=prefix, name=name, kind=kind, parent=parent, **kwargs)
self.stopped = False
self.name = name
self.service_cfg = None
self.scaninfo = None
self.filewriter = None
if not issubclass(self.custom_prepare_cls, CustomDetectorMixin):
raise DetectorInitError("Custom prepare class must be subclass of CustomDetectorMixin")
self.custom_prepare = self.custom_prepare_cls(parent=self, **kwargs)
if device_manager:
self._update_service_config()
self.device_manager = device_manager
else:
self.device_manager = bec_utils.DMMock()
base_path = kwargs["basepath"] if "basepath" in kwargs else "."
self.service_cfg = {"base_path": os.path.abspath(base_path)}
self.connector = self.device_manager.connector
self._update_scaninfo()
self._update_filewriter()
self._init()
def _update_filewriter(self) -> None:
"""Update filewriter with service config"""
self.filewriter = FileWriter(service_config=self.service_cfg, connector=self.connector)
def _update_scaninfo(self) -> None:
"""Update scaninfo from BecScaninfoMixing
This depends on device manager and operation/sim_mode
"""
self.scaninfo = BecScaninfoMixin(self.device_manager)
self.scaninfo.load_scan_metadata()
def _update_service_config(self) -> None:
"""Update service config from BEC service config
If bec services are not running and SERVICE_CONFIG is NONE, we fall back to the current directory.
"""
# pylint: disable=import-outside-toplevel
from bec_lib.bec_service import SERVICE_CONFIG
if SERVICE_CONFIG:
self.service_cfg = SERVICE_CONFIG.config["service_config"]["file_writer"]
return
self.service_cfg = {"base_path": os.path.abspath(".")}
def check_scan_id(self) -> None:
"""Checks if scan_id has changed and set stopped flagged to True if it has."""
old_scan_id = self.scaninfo.scan_id
self.scaninfo.load_scan_metadata()
if self.scaninfo.scan_id != old_scan_id:
self.stopped = True
def _init(self) -> None:
"""Initialize detector, filewriter and set default parameters"""
self.custom_prepare.on_init()
def stage(self) -> list[object]:
"""
Stage device in preparation for a scan.
First we check if the device is already staged. Stage is idempotent,
if staged twice it should raise (we let ophyd.Device handle the raise here).
We reset the stopped flag and get the scaninfo from BEC, before calling custom_prepare.on_stage.
Returns:
list(object): list of objects that were staged
"""
if self._staged != Staged.no:
return super().stage()
self.stopped = False
self.scaninfo.load_scan_metadata()
self.custom_prepare.on_stage()
return super().stage()
def pre_scan(self) -> None:
"""Pre-scan logic.
This function will be called from BEC directly before the scan core starts, and should only implement
time-critical actions. Therefore, it should also be kept as short/fast as possible.
I.e. Arming a detector in case there is a risk of timing out.
"""
self.custom_prepare.on_pre_scan()
def trigger(self) -> DeviceStatus:
"""Trigger the detector, called from BEC."""
# pylint: disable=assignment-from-no-return
status = self.custom_prepare.on_trigger()
if isinstance(status, DeviceStatus):
return status
return super().trigger()
def complete(self) -> None:
"""Complete the acquisition, called from BEC.
This function is called after the scan is complete, just before unstage.
We can check here with the data backend and detector if the acquisition successfully finished.
Actions are implemented in custom_prepare.on_complete since they are beamline specific.
"""
# pylint: disable=assignment-from-no-return
status = self.custom_prepare.on_complete()
if isinstance(status, DeviceStatus):
return status
status = DeviceStatus(self)
status.set_finished()
return status
def unstage(self) -> list[object]:
"""
Unstage device after a scan.
We first check if the scanID has changed, thus, the scan was unexpectedly interrupted but the device was not stopped.
If that is the case, the stopped flag is set to True, which will immediately unstage the device.
Custom_prepare.on_unstage is called to allow for BL specific logic to be executed.
Returns:
list(object): list of objects that were unstaged
"""
self.check_scan_id()
if self.stopped is True:
return super().unstage()
self.custom_prepare.on_unstage()
self.stopped = False
return super().unstage()
def stop(self, *, success=False) -> None:
"""
Stop the scan, with camera and file writer
"""
self.custom_prepare.on_stop()
super().stop(success=success)
self.stopped = True

View File

@ -27,7 +27,7 @@ import sys
# .. define base path for directory with scripts
class PhoenixBL_from_phoenic_bec_scripts():
class PhoenixBL():
"""
General class for PHOENIX beamline from_phoenic_bec_scripts
@ -41,7 +41,12 @@ class PhoenixBL_from_phoenic_bec_scripts():
init PhoenixBL() in phoenix_bec/scripts
"""
import os
import os
print('init PhoenixBL from phoenix_bec/scripts/phoenix.py')
#from ophyd import Device, EpicsMotor, EpicsSignal, EpicsSignalRO
#from ophyd import Component as Cpt
#self.ScanX = EpicsMotor(name='ScanX',prefix='X07MB-ES-MA1:ScanX')
@ -53,34 +58,35 @@ class PhoenixBL_from_phoenic_bec_scripts():
# load local configuration
print('init PhoenixBL after ')
self.path_scripts_local = '/data/test/x07mb-test-bec/bec_deployment/phoenix_bec/MyScripts/'
self.path_scripts_local = '/data/test/x07mb-test-bec/bec_deployment/phoenix_bec/phoenix_bec/local_scripts/'
self.path_config_local = self.path_scripts_local + 'ConfigPHOENIX/' # base dir for local configurations
self.path_devices_local = self.path_config_local + 'device_config/' # local yamal file
self.file_device_conf = self.path_devices_local + 'phoenix_devices.yaml'
self.file_device_file_local = self.path_devices_local + 'phoenix_devices.yaml'
self.path_phoenix_bec = '/data/test/x07mb-test-bec/bec_deployment/phoenix_bec/'
self.file_phoenix_devices_file = self.path_phoenix_bec+'phoenix_bec/device_configs/phoenix_devices.yaml'
#bec.config.update_session_with_file(self.file_device_conf)
# last command created yaml backup, for now just move it away
#os.system('mv *.yaml '+Devices_local+'/recovery_configs')
#os.system('mv *.yaml tmp')
def read_def_config(self):
bec.config.update_session_with_file(self.file_device_conf)
def print_setup(self):
"""
docstring print_setup
"""
def read_phoenix_config(self):
print('read file ')
print(self.file_phoenix_devices_file)
bec.config.update_session_with_file(self.file_phoenix_devices_file)
print(self.path_scripts_local)
def show_phoenix_setup(self):
print(self.path_phoenix_bec)
os.system('cat '+self.path_phoenix_bec+'phoenix_bec/local_scripts/Documentation/Current_setup.txt')
def test_func():
print('ttt')
print('ttpsssyt')