Add magic to post_startup.py to restart bec server from command line, first version of TTL Trigger device

This commit is contained in:
gac-x07mb
2024-08-23 13:37:46 +02:00
parent 6572a71f3b
commit c77f359451
15 changed files with 2895 additions and 125 deletions

View File

@ -36,6 +36,7 @@ to setup the prompts.
# pylint: disable=invalid-name, unused-import, import-error, undefined-variable, unused-variable, unused-argument, no-name-in-module
import time as tt
import sys
import os
from IPython.core.magic import register_line_magic
@ -132,6 +133,10 @@ def ph_load_config(line):
print('elapsed time:', tt.time()-t0)
#enddef
@register_line_magic
def ph_restart_bec_server(line):
os.system('bec-server restart')
os.system('gnome-terminal --geometry 120X50 -- bash -c "bec-server attach; exec bash"')
##@register_line_magic
#def ph_post_startup(line):

View File

@ -3,23 +3,15 @@
# phoenix standard devices (motors)
#
#
#####################################################
####################
#
# TRIGGER/Delay
#
###################
phoenix_trigger:
description: Trigger
####################################################:
TTL:
description: PHOENIX TTL trigger
deviceClass: phoenix_bec.devices.phoenix_trigger.PhoenixTrigger
deviceConfig:
prefix: 'X07MB-OP2:'
deviceTags:
- phoenix
- trigger
- TTL Trigger
- phoenix_devices.yaml
onFailure: buffer
enabled: true

View File

@ -1 +1 @@
from PhoenixTrigger import PhoenixTrigger
from .phoenix_trigger import PhoenixTrigger

View File

@ -256,7 +256,7 @@ class FalconSetup(CustomDetectorMixin):
#):
# # Retry stop detector and wait for remaining time
# raise FalconTimeoutError(
# f"Failed to stop detector, timeout with state {signal_conditions[0][0]}"
# f"Failed to stop detector, timeou t with state {signal_conditions[0][0]}"
# )
def stop_detector_backend(self) -> None:

View File

@ -1,3 +1,5 @@
import time
from ophyd import (
ADComponent as ADCpt,
Device,
@ -18,10 +20,6 @@ DETECTOR_TIMEOUT = 5
#class PhoenixTriggerError(Exce start_csmpl=Cpt(EPicsSignal,'START-CSMPL') # cont on / off
class PhoenixTriggerSetup(CustomDetectorMixin):
"""
This defines the PHOENIX trigger setup.
@ -29,11 +27,38 @@ class PhoenixTriggerSetup(CustomDetectorMixin):
"""
#self.acquire = self.parent.smpl.put(1)
#self.continuous_sampling_on = parent.start_cmpl.put(1)
#self.continuous_sampling_off = self.parent.start_cmpl.put(0)
def __init__(self, *args, parent:Device = None, **kwargs):
super().__init__(*args, parent=parent, **kwargs)
self._counter = 0
WW
def on_acquire(self):
self.parent.smpl.put(1)
print('on_aquire')
def on_cont_sample_on(self):
self.parent.start_csmpl.put(1)
print('on_cont_sample_on')
def on_cont_sample_off(self):
self.parent.start_csmpl.put(0)
print('on_cont_sample_off')
def on_done(self):
done = self.parent.smpl_done.get()
return done
def on_dwell(self,t):
" calculate cycles from time in sec "
cycles=self.parent.total_cycles.put(0)*5
def on_stage(self):
# is this called on each point in scan or just before scan ???
print('on stage')
@ -43,23 +68,26 @@ class PhoenixTriggerSetup(CustomDetectorMixin):
time.sleep(0.05)
cycles=self.parent.total_cycles.put(0)
time.sleep(0.05)
cycles=self.parent.smpl.put(2)
cycles=self.parent.smpl.put(1)
time.sleep(0.5)
cycles=self.parent.total_cycles.put(cycles)
logger.success('PhoenixTrigger on stage')
def on_trigger(self):
print('on_trigger')
self.parent.start_smpl.put(1)
logger.success('PhoenixTrigger on_trigger')
return self.wait_with_status(
[(self.parent.smpl_done.get, 1)])
#def on_trigger(self):
# print('on_trigger')
# self.parent.start_smpl.put(1)
# logger.success('PhoenixTrigger on_trigger')
#
# return self.wait_with_status(
# [(self.parent.smpl_done.get, 1)])
# logger.success(' PhoenixTrigger on_trigger complete ')
# if success:
@ -70,67 +98,57 @@ class PhoenixTriggerSetup(CustomDetectorMixin):
def on_complete(self):
print('on_complete')
timeout =10
#def on_complete(self):
# print('on_complete')
# timeout =10
logger.success('XXXX complete %d XXXX' % success)
# logger.success('XXXX complete %d XXXX' % success)
success = self.wait_for_signals(
[
(self.parent.smpl_done.get, 0))
],
timeout,
check_stopped=True,
all_signals=True
)
# success = self.wait_for_signals(
# [
# (self.parent.smpl_done.get, 0)
# ],
# timeout,
# check_stopped=True,
# all_signals=True
# )
if success:
status.set_finished()
else:
status.set_exception(TimeoutError())
return status
# if success:
# status.set_finished()
# else:
# status.set_exception(TimeoutError())
# return status
def on_stop(self):
logger.success(' PhoenixTrigger on_stop ')
self.parent.csmpl.put(1)
logger.success(' PhoenixTrigger on_stop finished ')
def on_unstage(self):
logger.success(' PhoenixTrigger on_unstage ')
self.parent.csmpl.put(1)
self.parent.smpl.put(1)
logger.success(' PhoenixTrigger on_unstage finished ')
# hoenixTrigger on_unstage ')
# self.parent.csmpl.put(1)
# self.parent.smpl.put(1)
# logger.success(' PhoenixTrigger on_unstage finished ')
#def on_trigger():
# print('on_trigger')
class PhoenixTrigger(PSIDetectorBase):
"""
Docstring:
Class for PHOENIX TTL hardware trigger
Parent class: PSIDetectorBase
class attributes:
custom_prepare_cls (XMAPSetup) : Custom detector setup class for cSAXS,
custom_prepare_cls (PhoenixTriggerSetup) : Custom setup for TTL trigger at PHOENIX
inherits from CustomDetectorMixin
in __init__ of PSIDetecor bases
class is initialized
self.custom_prepare = self.custom_prepare_cls(parent=self, **kwargs)
PSIDetectorBase.set_min_readout (float) : Minimum readout time for the detector
dxp (EpicsDXPXMAP) : DXP parameters for XMAP detector
mca (EpicsMCARecord) : MCA parameters for XMAP detector
hdf5 (XMAPHDF5Plugins) : HDF5 parameters for XMAP detector
MIN_READOUT (float) : Minimum readout time for the detector
The class PhoenixTrigger is the class to be called via yaml configuration file
the input arguments are defined by PSIDetectorBase,
and need to be given in the yaml configuration file.
@ -138,6 +156,8 @@ class PhoenixTrigger(PSIDetectorBase):
use prefix 'X07MB-OP2:' in the device definition in the yaml configuration file.
PSIDetectorBase(
prefix='',
*,Q
@ -147,12 +167,6 @@ class PhoenixTrigger(PSIDetectorBase):
device_manager=None,
**kwargs,
)
Docstring:
Abstract base class for SLS detectors
Class attributes:
custom_prepare_cls (object): class for custom prepare logic (BL specific)
Args:
prefix (str): EPICS PV prefix for component (optional)
name (str): name of the device, as will be reported via read()
@ -167,13 +181,66 @@ class PhoenixTrigger(PSIDetectorBase):
File: /data/test/x07mb-test-bec/bec_deployment/ophyd_devices/ophyd_devices/interfaces/base_classes/psi_detector_base.py
Type: type
Subclasses: EpicsSignal
"""
##################################################################
# Specify which functions are revealed to the user in BEC client
# only a set of predefined functions will be visible in dev.TTL
# The Variable USER_ACCESS contains an ascii list of functions which will be
# visible in dev.TTL as well
# Alternatively one couls also create 2nd instance of PhoenixTrigger,
# which is probably not ideal
USER_ACCESS = ["a_acquire"
,"a_cont_sample_on"
,"a_cont_sample_off"
,"prefix"
,"a_done"]
#####################################################################
# specify Setup class into variable custom_prepare_cls
# in __init__ of PSIDetectorBase will the initialzed by
# self.custom_prepare = self.custom_prepare_cls(parent=self, **kwargs)
# making the instance of PSIDetectorBase availble in functions
custom_prepare_cls = PhoenixTriggerSetup
start_csmpl = Cpt(EpicsSignal,'START-CSMPL') # cont on / off
#############################################################3
# Now use component to provide channel access
# when PhoenixTrigger is initialized, the parameters of the base class are
# inherided, most notable prefix, which is here X07MB-OP2:
# The input of Component=Cpt is Cpt(deviceClass,suffix)
# if Cpt is used in a class, which has interited Device, here via:
# (Here PhoenixTrigger <-- PSIDetectorBase <- Device
# the Cpt will construct - magically- the Epics channel name
# EpicsPV = prefix+suffix,
# for example
# 'X07MB-OP2:' + 'START-CSMPL' -> 'X07MB-OP2:' + 'START-CSMPL'
#
start_csmpl = Cpt(EpicsSignal, 'START-CSMPL') # cont on / off
intr_count = Cpt(EpicsSignal,'INTR-COUNT') # conter run up
total_cycles = Cpt(EpicsSignal,'TOTAL-CYCLES') # cycles set
smpl = Cpt(EpicsSignal,'SMPL') # start sampling --> aquire
smpl_done = Cpt(EpicsSignal,'SMPL-DONE') # show trigger is done
# link to reasonable names
# start with a_ to see functions quicklz in listing
#
#
def a_acquire(self):
self.custom_prepare.on_acquire()
def a_cont_sample_on(self):
self.custom_prepare.on_cont_sample_on()
def a_cont_sample_off(self):
self.custom_prepare.on_cont_sample_off()
def a_done(self):
done=self.custom_prepare.on_done()
return done
def a_dwell(self):
self.custom_prepare.on_dwell()

View File

@ -0,0 +1,182 @@
from ophyd import (
ADComponent as ADCpt,
Device,
DeviceStatus,
)
from ophyd import Component as Cpt
from ophyd import Device, EpicsSignal, EpicsSignalRO
from ophyd_devices.interfaces.base_classes.psi_detector_base import PSIDetectorBase, CustomDetectorMixin
from bec_lib import bec_logger, messages
from bec_lib.endpoints import MessageEndpoints
import time
logger = bec_logger.logger
DETECTOR_TIMEOUT = 5
#class PhoenixTriggerError(Exce start_csmpl=Cpt(EPicsSignal,'START-CSMPL') # cont on / off
class PhoenixTriggerSetup(CustomDetectorMixin):
"""
This defines the PHOENIX trigger setup.
"""
def __init__(self, *args, parent:Device = None, **kwargs):
super().__init__(*args, parent=parent, **kwargs)
self._counter = 0
WW
def on_stage(self):
# is this called on each point in scan or just before scan ???
print('on stage')
self.parent.start_csmpl.put(0)
time.sleep(0.05)
cycles=self.parent.total_cycles.get()
time.sleep(0.05)
cycles=self.parent.total_cycles.put(0)
time.sleep(0.05)
cycles=self.parent.smpl.put(2)
time.sleep(0.5)
cycles=self.parent.total_cycles.put(cycles)
logger.success('PhoenixTrigger on stage')
def on_trigger(self):
self.parent.start_smpl.put(1)
time.sleep(0.05) # use blocking
logger.success('PhoenixTrigger on_trigger')
return self.wait_with_status(
[(self.parent.smpl_done.get, 1)])
# logger.success(' PhoenixTrigger on_trigger complete ')
# if success:
# status.set_finished()
# else:
# status.set_exception(TimeoutError())
# return status
def on_complete(self):
timeout =10
logger.success('XXXX complete %d XXXX' % success)
success = self.wait_for_signals(
[
(self.parent.smpl_done.get, 0))
],
timeout,
check_stopped=True,
all_signals=True
)
if success:
status.set_finished()
else:
status.set_exception(TimeoutError())
return status
def on_stop(self):
logger.success(' PhoenixTrigger on_stop ')
self.parent.csmpl.put(1)
logger.success(' PhoenixTrigger on_stop finished ')
def on_unstage(self):
logger.success(' PhoenixTrigger on_unstage ')
self.parent.csmpl.put(1)
self.parent.smpl.put(1)
logger.success(' PhoenixTrigger on_unstage finished ')
class PhoenixTrigger(PSIDetectorBase):
"""
Parent class: PSIDetectorBase
class attributes:
custom_prepare_cls (XMAPSetup) : Custom detector setup class for cSAXS,
inherits from CustomDetectorMixin
in __init__ of PSIDetecor bases
class is initialized
self.custom_prepare = self.custom_prepare_cls(parent=self, **kwargs)
PSIDetectorBase.set_min_readout (float) : Minimum readout time for the detector
dxp (EpicsDXPXMAP) : DXP parameters for XMAP detector
mca (EpicsMCARecord) : MCA parameters for XMAP detector
hdf5 (XMAPHDF5Plugins) : HDF5 parameters for XMAP detector
MIN_READOUT (float) : Minimum readout time for the detector
The class PhoenixTrigger is the class to be called via yaml configuration file
the input arguments are defined by PSIDetectorBase,
and need to be given in the yaml configuration file.
To adress chanels such as 'X07MB-OP2:SMPL-DONE':
use prefix 'X07MB-OP2:' in the device definition in the yaml configuration file.
PSIDetectorBase(
prefix='',
*,Q
name,
kind=None,
parent=None,
device_manager=None,
**kwargs,
)
Docstring:
Abstract base class for SLS detectors
Class attributes:
custom_prepare_cls (object): class for custom prepare logic (BL specific)
Args:
prefix (str): EPICS PV prefix for component (optional)
name (str): name of the device, as will be reported via read()
kind (str): member of class 'ophydobj.Kind', defaults to Kind.normal
omitted -> readout ignored for read 'ophydobj.read()'
normal -> readout for read
config -> config parameter for 'ophydobj.read_configuration()'
hinted -> which attribute is readout for read
parent (object): instance of the parent device
device_manager (object): bec device manager
**kwargs: keyword arguments
File: /data/test/x07mb-test-bec/bec_deployment/ophyd_devices/ophyd_devices/interfaces/base_classes/psi_detector_base.py
Type: type
Subclasses: EpicsSignal
"""
custom_prepare_cls = PhoenixTriggerSetup
start_csmpl = Cpt(EpicsSignal,'START-CSMPL') # cont on / off
intr_count = Cpt(EpicsSignal,'INTR-COUNT') # conter run up
total_cycles = Cpt(EpicsSignal,'TOTAL-CYCLES') # cycles set
smpl_done = Cpt(EpicsSignal,'SMPL-DONE') # show trigger is done

View File

@ -310,10 +310,7 @@ class XMAPSetup(CustomDetectorMixin):
class XMAPphoenix(PSIDetectorBase):
"""MCA
XMAP detector for phoenix
Parent class: PSIDetectorBase
class attributes:
custom_prepare_cls (XMAPSetu
custom_prepare_cls (XMAPSetup) : Custom detector setup class for cSAXS,
inherits from CustomDetectorMixin
in __init__ of PSIDetecor base
@ -337,9 +334,9 @@ class XMAPphoenix(PSIDetectorBase):
dxp = Cpt(EpicsDXPXMAP, "dxp1:")
mca1 = Cpt(EpicsMCARecord, "mca1")
mca2 = Cpt(EpicsMCARecord, "mca2")
mca3 = Cpt(EpicsMCARecord, "mca3")
mca4 = Cpt(EpicsMCARecord, "mca4")
#mca2 = Cpt(EpicsMCARecord, "mca2")
#mca3 = Cpt(EpicsMCARecord, "mca3")
#mca4 = Cpt(EpicsMCARecord, "mca4")
print('load hdf5')
#hdf5 = Cpt(XMAPHDF5Plugins, "HDF1:")
@ -362,5 +359,11 @@ class XMAPphoenix(PSIDetectorBase):
auto_pixels_per_buffer = Cpt(EpicsSignal, "AutoPixelsPerBuffer")
pixels_per_buffer = Cpt(EpicsSignal, "PixelsPerBuffer")
pixels_per_run = Cpt(EpicsSignal, "PixelsPerRun")
#nd_array_mode = Cpt(EpicsSignal, "NDArrayMode")
print('DONE connecton chanels in XMAPphoenix')
def aaaa(self):
print('aaaa')

View File

@ -0,0 +1,435 @@
FILE ophyd_devices/ophy_devices/devices/interfaces/base_classes
"""This module contains the base class for SLS detectors. We follow the approach to integrate
PSI detectors into the BEC system based on this base class. The base class is used to implement
certain methods that are expected by BEC, such as stage, unstage, trigger, stop, etc...
We use composition with a custom prepare class to implement BL specific logic for the detector.
The beamlines need to inherit from the Custoon_
import threading
import time
import traceback
from bec_lib import messages
from bec_lib.endpoints import MessageEndpoints
from bec_lib.file_utils import FileWriter
from bec_lib.logger import bec_logger
from ophyd import Component, Device, DeviceStatus, Kind
from ophyd.device import Staged
from ophyd_devices.sim.sim_signals import SetableSignal
from ophyd_devices.utils import bec_utils
from ophyd_devices.utils.bec_scaninfo_mixin import BecScaninfoMixin
from ophyd_devices.utils.errors import DeviceStopError, DeviceTimeoutError
logger = bec_logger.logger
class DetectorInitError(Exception):
"""Raised when initiation of the device class fails,
due to missing device manager or not started in sim_mode."""
class CustomDetectorMixin:
"""
Mixin class for custom detector logic
This class is used to implement BL specific logic for the detector.
It is used in the PSIDetectorBase class.
For the integration of a new detector, the following functions should
help with integrating functionality, but additional ones can be added.
Check PSIDetectorBase for the functions that are called during relevant function calls of
stage, unstage, trigger, stop and _init.
"""
def __init__(self, *_args, parent: Device = None, **_kwargs) -> None:
self.parent = parent
def on_init(self) -> None:
"""
Init sequence for the detector
"""
def on_stage(self) -> None:
"""
Specify actions to be executed during stage in preparation for a scan.
self.parent.scaninfo already has all current parameters for the upcoming scan.
In case the backend service is writing data on disk, this step should include publishing
a file_event and file_message to BEC to inform the system where the data is written to.
IMPORTANT:
It must be safe to assume that the device is ready for the scan
to start immediately once this function is finished.
"""
def on_unstage(self) -> None:
"""
Specify actions to be executed during unstage.
This step should include checking if the acqusition was successful,
and publishing the file location and file event message,
with flagged done to BEC.
"""
def on_stop(self) -> None:
"""
Specify actions to be executed during stop.
This must also set self.parent.stopped to True.
This step should include stopping the detector and backend service.
"""
def on_trigger(self) -> None | DeviceStatus:
"""
Specify actions to be executed upon receiving trigger signal.
Return a DeviceStatus object or None
"""
def on_pre_scan(self) -> None:
"""
Specify actions to be executed right before a scan starts.
Only use if needed, and it is recommended to keep this function as short/fast as possible.
"""
def on_complete(self) -> None | DeviceStatus:
"""
Specify actions to be executed when the scan is complete.
This can for instance be to check with the detector and backend if all data is written succsessfully.
"""
def publish_file_location(self, done: bool, successful: bool, metadata: dict = None) -> None:
"""
Publish the filepath to REDIS.
We publish two events here:
- file_event: event for the filewriter
- public_file: event for any secondary service (e.g. radial integ code)
Args:
done (bool): True if scan is finished
successful (bool): True if scan was successful
metadata (dict): additional metadata to publish
"""
if metadata is None:
metadata = {}
msg = messages.FileMessage(
file_path=self.parent.filepath.get(),
done=done,
successful=successful,
metadata=metadata,
)
pipe = self.parent.connector.pipeline()
self.parent.connector.set_and_publish(
MessageEndpoints.public_file(self.parent.scaninfo.scan_id, self.parent.name),
msg,
pipe=pipe,
)
self.parent.connector.set_and_publish(
MessageEndpoints.file_event(self.parent.name), msg, pipe=pipe
)
pipe.execute()
def wait_for_signals(
self,
signal_conditions: list[tuple],
timeout: float,
check_stopped: bool = False,
interval: float = 0.05,
all_signals: bool = False,
) -> bool:
"""
Convenience wrapper to allow waiting for signals to reach a certain condition.
For EPICs PVs, an example usage is pasted at the bottom.
Args:
signal_conditions (list[tuple]): tuple of executable calls for conditions (get_current_state, condition) to check
timeout (float): timeout in seconds
interval (float): interval in seconds
all_signals (bool): True if all signals should be True, False if any signal should be True
Returns:
bool: True if all signals are in the desired state, False if timeout is reached
>>> Example usage for EPICS PVs:
>>> self.wait_for_signals(signal_conditions=[(self.acquiring.get, False)], timeout=5, interval=0.05, check_stopped=True, all_signals=True)
"""
timer = 0
while True:
checks = [
get_current_state() == condition
for get_current_state, condition in signal_conditions
]
if check_stopped is True and self.parent.stopped is True:
return False
if (all_signals and all(checks)) or (not all_signals and any(checks)):
return True
if timer > timeout:
return False
time.sleep(interval)
timer += interval
def wait_with_status(
self,
signal_conditions: list[tuple],
timeout: float,
check_stopped: bool = False,
interval: float = 0.05,
all_signals: bool = False,
exception_on_timeout: Exception = None,
) -> DeviceStatus:
"""Utility function to wait for signals in a thread.
Returns a DevicesStatus object that resolves either to set_finished or set_exception.
The DeviceStatus is attached to the parent device, i.e. the detector object inheriting from PSIDetectorBase.
Usage:
This function should be used to wait for signals to reach a certain condition, especially in the context of
on_trigger and on_complete. If it is not used, functions may block and slow down the performance of BEC.
It will return a DeviceStatus object that is to be returned from the function. Once the conditions are met,
the DeviceStatus will be set to set_finished in case of success or set_exception in case of a timeout or exception.
The exception can be specified with the exception_on_timeout argument. The default exception is a TimeoutError.
Args:
signal_conditions (list[tuple]): tuple of executable calls for conditions (get_current_state, condition) to check
timeout (float): timeout in seconds
check_stopped (bool): True if stopped flag should be checked
interval (float): interval in seconds
all_signals (bool): True if all signals should be True, False if any signal should be True
exception_on_timeout (Exception): Exception to raise on timeout
Returns:
DeviceStatus: DeviceStatus object that resolves either to set_finished or set_exception
"""
if exception_on_timeout is None:
exception_on_timeout = DeviceTimeoutError(
f"Timeout error for {self.parent.name} while waiting for signals {signal_conditions}"
)
status = DeviceStatus(self.parent)
# utility function to wrap the wait_for_signals function
def wait_for_signals_wrapper(
status: DeviceStatus,
signal_conditions: list[tuple],
timeout: float,
check_stopped: bool,
interval: float,
all_signals: bool,
exception_on_timeout: Exception,
):
"""Convenient wrapper around wait_for_signals to set status based on the result.
Args:
status (DeviceStatus): DeviceStatus object to be set
signal_conditions (list[tuple]): tuple of executable calls for conditions (get_current_state, condition) to check
timeout (float): timeout in seconds
check_stopped (bool): True if stopped flag should be checked
interval (float): interval in seconds
all_signals (bool): True if all signals should be True, False if any signal should be True
exception_on_timeout (Exception): Exception to raise on timeout
"""
try:
result = self.wait_for_signals(
signal_conditions, timeout, check_stopped, interval, all_signals
)
if result:
status.set_finished()
else:
if self.parent.stopped:
# INFO This will execute a callback to the parent device.stop() method
status.set_exception(exc=DeviceStopError(f"{self.parent.name} was stopped"))
else:
# INFO This will execute a callback to the parent device.stop() method
status.set_exception(exc=exception_on_timeout)
# pylint: disable=broad-except
except Exception as exc:
content = traceback.format_exc()
logger.warning(
f"Error in wait_for_signals in {self.parent.name}; Traceback: {content}"
)
# INFO This will execute a callback to the parent device.stop() method
status.set_exception(exc=exc)
thread = threading.Thread(
target=wait_for_signals_wrapper,
args=(
status,
signal_conditions,
timeout,
check_stopped,
interval,
all_signals,
exception_on_timeout,
),
daemon=True,
)
thread.start()
return status
class PSIDetectorBase(Device):
"""
Abstract base class for SLS detectors
Class attributes:
custom_prepare_cls (object): class for custom prepare logic (BL specific)
Args:
prefix (str): EPICS PV prefix for component (optional)
name (str): name of the device, as will be reported via read()
kind (str): member of class 'ophydobj.Kind', defaults to Kind.normal
omitted -> readout ignored for read 'ophydobj.read()'
normal -> readout for read
config -> config parameter for 'ophydobj.read_configuration()'
hinted -> which attribute is readout for read
parent (object): instance of the parent device
device_manager (object): bec device manager
**kwargs: keyword arguments
"""
filepath = Component(SetableSignal, value="", kind=Kind.config)
custom_prepare_cls = CustomDetectorMixin
def __init__(self, prefix="", *, name, kind=None, parent=None, device_manager=None, **kwargs):
super().__init__(prefix=prefix, name=name, kind=kind, parent=parent, **kwargs)
self.stopped = False
self.name = name
self.service_cfg = None
self.scaninfo = None
self.filewriter = None
if not issubclass(self.custom_prepare_cls, CustomDetectorMixin):
raise DetectorInitError("Custom prepare class must be subclass of CustomDetectorMixin")
self.custom_prepare = self.custom_prepare_cls(parent=self, **kwargs)
if device_manager:
self._update_service_config()
self.device_manager = device_manager
else:
self.device_manager = bec_utils.DMMock()
base_path = kwargs["basepath"] if "basepath" in kwargs else "."
self.service_cfg = {"base_path": os.path.abspath(base_path)}
self.connector = self.device_manager.connector
self._update_scaninfo()
self._update_filewriter()
self._init()
def _update_filewriter(self) -> None:
"""Update filewriter with service config"""
self.filewriter = FileWriter(service_config=self.service_cfg, connector=self.connector)
def _update_scaninfo(self) -> None:
"""Update scaninfo from BecScaninfoMixing
This depends on device manager and operation/sim_mode
"""
self.scaninfo = BecScaninfoMixin(self.device_manager)
self.scaninfo.load_scan_metadata()
def _update_service_config(self) -> None:
"""Update service config from BEC service config
If bec services are not running and SERVICE_CONFIG is NONE, we fall back to the current directory.
"""
# pylint: disable=import-outside-toplevel
from bec_lib.bec_service import SERVICE_CONFIG
if SERVICE_CONFIG:
self.service_cfg = SERVICE_CONFIG.config["service_config"]["file_writer"]
return
self.service_cfg = {"base_path": os.path.abspath(".")}
def check_scan_id(self) -> None:
"""Checks if scan_id has changed and set stopped flagged to True if it has."""
old_scan_id = self.scaninfo.scan_id
self.scaninfo.load_scan_metadata()
if self.scaninfo.scan_id != old_scan_id:
self.stopped = True
def _init(self) -> None:
"""Initialize detector, filewriter and set default parameters"""
self.custom_prepare.on_init()
def stage(self) -> list[object]:
"""
Stage device in preparation for a scan.
First we check if the device is already staged. Stage is idempotent,
if staged twice it should raise (we let ophyd.Device handle the raise here).
We reset the stopped flag and get the scaninfo from BEC, before calling custom_prepare.on_stage.
Returns:
list(object): list of objects that were staged
"""
if self._staged != Staged.no:
return super().stage()
self.stopped = False
self.scaninfo.load_scan_metadata()
self.custom_prepare.on_stage()
return super().stage()
def pre_scan(self) -> None:
"""Pre-scan logic.
This function will be called from BEC directly before the scan core starts, and should only implement
time-critical actions. Therefore, it should also be kept as short/fast as possible.
I.e. Arming a detector in case there is a risk of timing out.
"""
self.custom_prepare.on_pre_scan()
def trigger(self) -> DeviceStatus:
"""Trigger the detector, called from BEC."""
# pylint: disable=assignment-from-no-return
status = self.custom_prepare.on_trigger()
if isinstance(status, DeviceStatus):
return status
return super().trigger()
def complete(self) -> None:
"""Complete the acquisition, called from BEC.
This function is called after the scan is complete, just before unstage.
We can check here with the data backend and detector if the acquisition successfully finished.
Actions are implemented in custom_prepare.on_complete since they are beamline specific.
"""
# pylint: disable=assignment-from-no-return
status = self.custom_prepare.on_complete()
if isinstance(status, DeviceStatus):
return status
status = DeviceStatus(self)
status.set_finished()
return status
def unstage(self) -> list[object]:
"""
Unstage device after a scan.
We first check if the scanID has changed, thus, the scan was unexpectedly interrupted but the device was not stopped.
If that is the case, the stopped flag is set to True, which will immediately unstage the device.
Custom_prepare.on_unstage is called to allow for BL specific logic to be executed.
Returns:
list(object): list of objects that were unstaged
"""
self.check_scan_id()
self.custom_prepare.on_unstage()
self.stopped = False
return super().unstage()
def stop(self, *, success=False) -> None:
"""
Stop the scan, with camera and file writer
"""
self.custom_prepare.on_stop()
super().stop(success=success)
self.stopped = True

View File

@ -13,8 +13,7 @@ from copy import deepcopy
from typing import TYPE_CHECKING, Dict, Literal
from toolz import partition
from typeguard import typechecked
from typeguard import typecheck
from bec_lib import messages
from bec_lib.bec_errors import ScanAbortion
from bec_lib.client import SystemConfig

File diff suppressed because it is too large Load Diff

View File

@ -19,25 +19,15 @@ import importlib
import ophyd
#logger = bec_logger.logger
# load local configuration
#bec.config.load_demo_config()
# .. define base path for directory with scripts
PhoenixBL=0
from ConfigPHOENIX.config.phoenix import PhoenixBL
#from ConfigPHOENIX.devices.falcon_csaxs import FalconSetup
# initialize general parameter
ph=PhoenixBL()
bec.config.update_session_with_file('./ConfigPHOENIX/device_config/phoenix_devices.yaml')
#
phoenix.add_phoenix_config()
#bec.config.update_session_with_file('./ConfigPHOENIX/device_config/phoenix_devices.yaml')
time.sleep(1)
s1=scans.line_scan(dev.ScanX,0,0.002,steps=4,exp_time=1,relative=False,delay=2)
#s1=scans.line_scan(dev.ScanX,0,0.1,steps=4,exp_time=1,relative=False,delay=2)
s2=scans.phoenix_line_scan(dev.ScanX,0,0.002,steps=4,exp_time=.01,relative=False,delay=2)

View File

@ -0,0 +1,8 @@
This diretory is for scripts, test etc. which are not loaded into the server.
Hence no directory should contain a file named
__init__.py
For now we keep it in the phoenix_bec structure, but for operation, such files should be located out side of the
bec_phoenix plugin.

View File

@ -1 +1 @@
from .phoenix_line_scan import PhoenixLineScan
from .phoenix_scans import PhoenixLineScan

View File

@ -22,19 +22,41 @@ but they are executed in a specific order:
- self.cleanup # send a close scan message and perform additional cleanups if needed
"""
# imports in ScanBase
#from __future__ import annotations
#import ast
#import enum
#import threading
#import time
#import uuid
#from abc import ABC, abstractmethod
#from typing import Any, Literal
#import numpy as np
#from bec_lib.device import DeviceBase
#from bec_lib.devicemanager import DeviceManagerBase
#from bec_lib.endpoints import MessageEndpoints
#from bec_lib.logger import bec_logger
#from .errors import LimitError, ScanAbortion
#from .path_optimization import PathOptimizerMixin
#from .scan_stubs import ScanStubs
# end imports in ScanBase
# import time
# import numpy as np
# from bec_lib.endpoints import MessageEndpoints
# from bec_lib.logger import bec_logger
from bec_lib.logger import bec_logger
# from bec_lib import messages
# from bec_server.scan_server.errors import ScanAbortion
# from bec_server.scan_server.scans import FlyScanBase, RequestBase, ScanArgType, ScanBase
# logger = bec_logger.logger
from bec_server.scan_server.scans import ScanBase, ScanArgType
import numpy as np
import time
@ -42,8 +64,63 @@ from bec_lib.logger import bec_logger
logger = bec_logger.logger
class PhoenixLineScan(ScanBase):
scan_name = "phoenix_line_scanZZZ"
class LogTime():
def __init__(self):
self.t0=time.process_time()
def p_s(self,x):
now=time.process_time()
delta=now-self.t0
m=str(delta)+' sec '+x
logger.success(m)
self.t0=now
ll=LogTime()
class PhoenixScanBaseTTL(ScanBase):
"""
Base scan cl p_s('init scrips.phoenix.scans.PhoenixLineScan')
"""
ll.p_s('enter scripts.phoenix.scans.PhoenixScanBaseTTL')
def scan_core(self):
"""perform the scan core procedure"""
ll.p_s('PhoenixScanBaseTT.scan_core')
for ind, pos in self._get_position():
for self.burst_index in range(self.burst_at_each_point):
ll.p_s('PhoenixScanBaseTT.scan_core in loop ')
yield from self._at_each_point(ind, pos)
self.burst_index = 0
def _at_each_point(self, ind=None, pos=None):
ll.p_s('PhoenixScanBaseTT._at_each_point')
yield from self._move_scan_motors_and_wait(pos)
if ind > 0:
yield from self.stubs.wait(
wait_type="read", group="primary", wait_group="readout_primary"
)
time.sleep(self.settling_time)
yield from self.stubs.trigger(group="trigger", point_id=self.point_id)
yield from self.stubs.wait(wait_type="trigger", group="trigger", wait_time=self.exp_time)
yield from self.stubs.read(
group="primary", wait_group="readout_primary", point_id=self.point_id
)
yield from self.stubs.wait(
wait_type="read", group="scan_motor", wait_group="readout_primary"
)
self.point_id += 1
ll.p_s('done')
class PhoenixLineScan(PhoenixScanBaseTTL):
ll.p_s('enter scripts.phoenix.scans.PhoenixLineScan')
scan_name = "phoenix_line_scan"
required_kwargs = ["steps", "relative"]
arg_input = {
"device": ScanArgType.DEVICE,
@ -78,38 +155,22 @@ class PhoenixLineScan(ScanBase):
ans.line_scan(dev.motor1, -5, 5, dev.motor2, -5, 5, steps=10, exp_time=0.1, relative=True)
"""
ll.p_s('init scripts.phoenix.scans.PhoenixLineScan')
super().__init__(
exp_time=exp_time, relative=relative, burst_at_each_point=burst_at_each_point, **kwargs
)
self.steps = steps
self.setup_device = setup_device
print('INIT CLASS PhoenixLineScan')
time.sleep(1)
time.sleep(1)
ll.p_s('done')
def _calculate_positions(self) -> None:
ll.p_s('PhoenixLineScan._calculate_positions')
axis = []
for _, val in self.caller_args.items():
ax_pos = np.linspace(val[0], val[1], self.steps, dtype=float)
axis.append(ax_pos)
self.positions = np.array(list(zip(*axis)), dtype=float)
ll.p_s('done')
def _at_each_point(self, ind=None, pos=None):
yield from self._move_scan_motors_and_wait(pos)
if ind > 0:
yield from self.stubs.wait(
wait_type="read", group="primary", wait_group="readout_primary"
)
time.sleep(self.settling_time)
if self.setup_device:
yield from self.stubs.send_rpc_and_wait(self.setup_device, "velocity.set", 1)
yield from self.stubs.trigger(group="trigger", point_id=self.point_id)
yield from self.stubs.wait(wait_type="trigger", group="trigger", wait_time=self.exp_time)
yield from self.stubs.read(
group="primary", wait_group="readout_primary", point_id=self.point_id
)
yield from self.stubs.wait(
wait_type="read", group="scan_motor", wait_group="readout_primary"
)
self.point_id += 1

View File

@ -28,6 +28,9 @@ logger = bec_logger.logger
# .. define base path for directory with scripts
class PhoenixBL():
"""
#
@ -68,7 +71,7 @@ class PhoenixBL():
print('add xmap ')
print(self.path_devices+'phoenix_xmap.yaml')
bec.config.update_session_with_file(self.path_devices+'phoenix_xmap.yaml',timeout=100)
bec.config.update_session_with_file(self.path_devices+'phoenix_xmap.yaml')#,timeout=100)
def add_falcon(self):
print('add_xmap')