Feat/add devices #8
301
superxas_bec/device_configs/x10da_config_test.yaml
Normal file
301
superxas_bec/device_configs/x10da_config_test.yaml
Normal file
@@ -0,0 +1,301 @@
|
||||
|
||||
#######################################
|
||||
## Beam Monitors 2 and 3 -- Virtual positioners
|
||||
|
||||
bm2_tr1:
|
||||
readoutPriority: baseline
|
||||
description: Beam Monitor 2 Translation 1
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig:
|
||||
prefix: X10DA-OP-BM2:TR1
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
softwareTrigger: false
|
||||
bm2_tr2:
|
||||
readoutPriority: baseline
|
||||
description: Beam Monitor 2 Translation 2
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig:
|
||||
prefix: X10DA-OP-BM2:TR2
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
softwareTrigger: false
|
||||
bm3_tr1:
|
||||
readoutPriority: baseline
|
||||
description: Beam Monitor 3 Translation 1
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig:
|
||||
prefix: X10DA-OP-BM3:TR1
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
softwareTrigger: false
|
||||
bm3_tr2:
|
||||
readoutPriority: baseline
|
||||
description: Beam Monitor 3 Translation 2
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig:
|
||||
prefix: X10DA-OP-BM3:TR2
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
softwareTrigger: false
|
||||
kb_slit_y:
|
||||
readoutPriority: baseline
|
||||
description: KB slit axis Y
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig:
|
||||
prefix: X10DA-ES1-SV1:OPENY
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
softwareTrigger: false
|
||||
|
||||
##### Ionization chambers
|
||||
ic1:
|
||||
readoutPriority: monitored
|
||||
description: Ionization Chamber 1
|
||||
deviceClass: ophyd.EpicsSignalRO
|
||||
deviceConfig:
|
||||
read_pv: X10DA-ES1-SAI_01:MEAN
|
||||
auto_monitor: True
|
||||
onFailure: raise
|
||||
enabled: True
|
||||
softwareTrigger: False
|
||||
ic2:
|
||||
readoutPriority: monitored
|
||||
description: Ionization Chamber 2
|
||||
deviceClass: ophyd.EpicsSignalRO
|
||||
deviceConfig:
|
||||
read_pv: X10DA-ES1-SAI_02:MEAN
|
||||
auto_monitor: True
|
||||
onFailure: raise
|
||||
enabled: True
|
||||
softwareTrigger: False
|
||||
ic3:
|
||||
readoutPriority: monitored
|
||||
description: Ionization Chamber 3
|
||||
deviceClass: ophyd.EpicsSignalRO
|
||||
deviceConfig:
|
||||
read_pv: X10DA-ES1-SAI_03:MEAN
|
||||
auto_monitor: True
|
||||
onFailure: raise
|
||||
enabled: True
|
||||
softwareTrigger: False
|
||||
ic4:
|
||||
readoutPriority: monitored
|
||||
description: Ionization Chamber 4
|
||||
deviceClass: ophyd.EpicsSignalRO
|
||||
deviceConfig:
|
||||
read_pv: X10DA-ES1-SAI_04:MEAN
|
||||
auto_monitor: True
|
||||
onFailure: raise
|
||||
enabled: True
|
||||
softwareTrigger: False
|
||||
|
||||
##### Trigger Card #####
|
||||
|
||||
trigger:
|
||||
readoutPriority: baseline
|
||||
description: Trigger Card
|
||||
deviceClass: superxas_bec.devices.trigger.Trigger
|
||||
deviceConfig:
|
||||
prefix: 'X10DA-ES1:'
|
||||
onFailure: raise
|
||||
enabled: True
|
||||
softwareTrigger: True
|
||||
|
||||
##### Falcon detector #####
|
||||
falcon:
|
||||
readoutPriority: monitored
|
||||
description: Falcon Sitoro detector
|
||||
deviceClass: superxas_bec.devices.falcon.FalconSuperXAS
|
||||
deviceConfig:
|
||||
prefix: 'X10DA-SITORO:'
|
||||
onFailure: raise
|
||||
enabled: True
|
||||
softwareTrigger: False
|
||||
|
||||
|
||||
|
||||
#################################
|
||||
###### EXPERIMENTAL STATION #####
|
||||
#################################
|
||||
|
||||
#######################################
|
||||
## Harmonic Rejection Mirror -- Physical positioners
|
||||
|
||||
hrm_try:
|
||||
readoutPriority: baseline
|
||||
description: Harmonic Rejection Mirror Y-Translation
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig:
|
||||
prefix: X10DA-ES1-HRM:TRY
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
softwareTrigger: false
|
||||
hrm_rotx:
|
||||
readoutPriority: baseline
|
||||
description: Harmonic Rejection Mirror X-Rotation
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig:
|
||||
prefix: X10DA-ES1-HRM:ROX
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
softwareTrigger: false
|
||||
|
||||
|
||||
#######################################
|
||||
## Ionization Chambers 1, 2, and 3 -- Physical positioners
|
||||
|
||||
ic1_try:
|
||||
readoutPriority: baseline
|
||||
description: Ionization Chamber 1 Y-Translation
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig:
|
||||
prefix: X10DA-ES1-IC1:TRY
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
softwareTrigger: false
|
||||
ic2_try:
|
||||
readoutPriority: baseline
|
||||
description: Ionization Chamber 2 Y-Translation
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig:
|
||||
prefix: X10DA-ES1-IC2:TRY
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
softwareTrigger: false
|
||||
ic3_try:
|
||||
readoutPriority: baseline
|
||||
description: Ionization Chamber 3 Y-Translation
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig:
|
||||
prefix: X10DA-ES1-IC3:TRY
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
softwareTrigger: false
|
||||
|
||||
#######################################
|
||||
## Sample Manipulator (Old) -- Physical positioners
|
||||
|
||||
ma1_trx:
|
||||
readoutPriority: baseline
|
||||
description: Sample Manipulator 1 X-Translation
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig:
|
||||
prefix: X10DA-ES1-MA1:TRX
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
softwareTrigger: false
|
||||
ma1_trx1:
|
||||
readoutPriority: baseline
|
||||
description: Sample Manipulator 1 X-Translation 1
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig:
|
||||
prefix: X10DA-ES1-MA1:TRX1
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
softwareTrigger: false
|
||||
ma1_trx2:
|
||||
readoutPriority: baseline
|
||||
description: Sample Manipulator 1 X-Translation 2
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig:
|
||||
prefix: X10DA-ES1-MA1:TRX2
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
softwareTrigger: false
|
||||
ma1_try:
|
||||
readoutPriority: baseline
|
||||
description: Sample Manipulator 1 Y-Translation
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig:
|
||||
prefix: X10DA-ES1-MA1:TRY
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
softwareTrigger: false
|
||||
# ma1_rot2:
|
||||
# readoutPriority: baseline
|
||||
# description: Sample Manipulator 1 Y-Rotation
|
||||
# deviceClass: ophyd.EpicsMotor
|
||||
# deviceConfig:
|
||||
# prefix: X10DA-ES1-MA1:ROT2
|
||||
# onFailure: retry
|
||||
# enabled: true
|
||||
# softwareTrigger: false
|
||||
|
||||
|
||||
#######################################
|
||||
## Experimental Table 1 and 2 -- Physical positioners
|
||||
|
||||
et1_trx:
|
||||
readoutPriority: baseline
|
||||
description: Experimental Table 1 X-Translation
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig:
|
||||
prefix: X10DA-ES1-ET1:TRX
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
softwareTrigger: false
|
||||
et1_try:
|
||||
readoutPriority: baseline
|
||||
description: Experimental Table 1 Y-Translation
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig:
|
||||
prefix: X10DA-ES1-ET1:TRY
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
softwareTrigger: false
|
||||
et2_trx:
|
||||
readoutPriority: baseline
|
||||
description: Experimental Table 2 X-Translation
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig:
|
||||
prefix: X10DA-ES2-ET2:TRX
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
softwareTrigger: false
|
||||
et2_try:
|
||||
readoutPriority: baseline
|
||||
description: Experimental Table 2 Y-Translation
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig:
|
||||
prefix: X10DA-ES2-ET2:TRY
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
softwareTrigger: false
|
||||
|
||||
#######################################
|
||||
## X-Ray Eye -- Physical positioners
|
||||
|
||||
xe1_zoom:
|
||||
readoutPriority: baseline
|
||||
description: X-Ray Eye Zoom
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig:
|
||||
prefix: X10DA-ES1-XE1:ZOOM
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
softwareTrigger: false
|
||||
xe1_finfoc:
|
||||
readoutPriority: baseline
|
||||
description: X-Ray Eye Fine Focus
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig:
|
||||
prefix: X10DA-ES1-XE1:FINFOC
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
softwareTrigger: false
|
||||
|
||||
#######################################
|
||||
## Optics
|
||||
|
||||
## EPICS IOC does not seem to comply to MotorRecord
|
||||
|
||||
mono_energy:
|
||||
readoutPriority: baseline
|
||||
description: X-Ray Eye Zoom
|
||||
deviceClass: ophyd.PVPositioner
|
||||
deviceConfig:
|
||||
prefix: X10DA-OP1-MO1:BraggEAO
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
softwareTrigger: false
|
||||
204
superxas_bec/devices/falcon.py
Normal file
204
superxas_bec/devices/falcon.py
Normal file
@@ -0,0 +1,204 @@
|
||||
"""FALCON device implementation for SuperXAS"""
|
||||
|
||||
import enum
|
||||
|
||||
import numpy as np
|
||||
from bec_lib.devicemanager import ScanInfo
|
||||
from bec_lib.logger import bec_logger
|
||||
from ophyd import Component as Cpt
|
||||
from ophyd import DeviceStatus, Kind, Signal, StatusBase
|
||||
from ophyd.status import SubscriptionStatus
|
||||
from ophyd_devices.devices.dxp import EpicsDXPFalcon, EpicsMCARecord, Falcon
|
||||
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
|
||||
class FalconAcquiringStatus(int, enum.Enum):
|
||||
"""Status of Falcon"""
|
||||
|
||||
DONE = 0
|
||||
ACQUIRING = 1
|
||||
|
||||
|
||||
class DeadTimeCorrectedCounts(Signal):
|
||||
"""Signal to calculate dead time corrected counts"""
|
||||
|
||||
def __init__(self, name: str, channel: int, **kwargs):
|
||||
"""
|
||||
Initialize DeadTimeCorrectedCounts signal.
|
||||
|
||||
Args:
|
||||
name (str): Name of the signal
|
||||
channel (int): Channel number
|
||||
"""
|
||||
super().__init__(name=name, **kwargs)
|
||||
self._channel = channel
|
||||
self._dead_time = 1.182e-7
|
||||
|
||||
# pylint: disable=arguments-differ
|
||||
def get(self) -> float:
|
||||
"""Get dead time corrected counts base on signals from dxp and mca of Falcon"""
|
||||
dxp: EpicsDXPFalcon = getattr(self.parent, f"dxp{self._channel}")
|
||||
mca: EpicsMCARecord = getattr(self.parent, f"mca{self._channel}")
|
||||
|
||||
icr = dxp.input_count_rate.get()
|
||||
ocr = dxp.output_count_rate.get()
|
||||
roi = mca.rois.roi0.count.get()
|
||||
ert = mca.elapsed_real_time.get()
|
||||
print(icr, ocr, roi, ert)
|
||||
|
||||
if icr == 0 or ocr == 0:
|
||||
return 0
|
||||
|
||||
# Check that relative change is large enough
|
||||
test = 1e9
|
||||
test_icr = icr
|
||||
n = 0
|
||||
while test > self._dead_time and n < 30:
|
||||
try:
|
||||
true_icr = icr * np.exp(test_icr * self._dead_time)
|
||||
test = (true_icr - test_icr) / test_icr
|
||||
test_icr = true_icr
|
||||
n += 1
|
||||
except Exception as e: # pylint: disable=broad-except
|
||||
logger.info(f"Error in computation of signal {self.name}, error: {e}")
|
||||
return 0
|
||||
|
||||
# Return corrected roi counts
|
||||
cor_roi_cnts = 0
|
||||
if ocr * ert != 0:
|
||||
cor_roi_cnts = roi * true_icr / (ocr * ert)
|
||||
return cor_roi_cnts
|
||||
|
||||
|
||||
class FalconControl(Falcon):
|
||||
"""Falcon Control class at SuperXAS. prefix: 'X10DA-SITORO:'"""
|
||||
|
||||
_default_read_attrs = Falcon._default_read_attrs + (
|
||||
"dxp1",
|
||||
# # "dxp2",
|
||||
"mca1",
|
||||
# # "mca2",
|
||||
"dead_time_cor_cnts1",
|
||||
# # "dead_time_cor_cnts2",
|
||||
)
|
||||
_default_configuration_attrs = Falcon._default_configuration_attrs + (
|
||||
"dxp1",
|
||||
# "dxp2",
|
||||
"mca1",
|
||||
# "mca2",
|
||||
"dead_time_cor_cnts1",
|
||||
# "dead_time_cor_cnts2",
|
||||
)
|
||||
|
||||
# DXP parameters
|
||||
dxp1 = Cpt(EpicsDXPFalcon, "dxp1:")
|
||||
# dxp2 = Cpt(EpicsDXPFalcon, "dxp2:")
|
||||
|
||||
# MCA record with spectrum data
|
||||
mca1 = Cpt(EpicsMCARecord, "mca1")
|
||||
# mca2 = Cpt(EpicsMCARecord, "mca2")
|
||||
|
||||
# Norm Signal
|
||||
dead_time_cor_cnts1 = Cpt(
|
||||
DeadTimeCorrectedCounts, name="dead_time_cor_cnts", channel=1, kind=Kind.hinted
|
||||
)
|
||||
# dead_time_cor_cnts2 = Cpt(DeadTimeCorrectedCounts, name='dead_time_cor_cnts', channel=2, kind=Kind.normal)
|
||||
|
||||
|
||||
class FalconSuperXAS(PSIDeviceBase, FalconControl):
|
||||
"""Falcon implementierung at SuperXAS. prefix: 'X10DA-SITORO:'"""
|
||||
|
||||
########################################
|
||||
# Beamline Specific Implementations #
|
||||
########################################
|
||||
|
||||
def __init__(self, name: str, prefix: str = "", scan_info: ScanInfo | None = None, **kwargs):
|
||||
"""
|
||||
Initialize Falcon device.
|
||||
|
||||
Args:
|
||||
name (str): Name of the device
|
||||
prefix (str): Prefix of the device
|
||||
scan_info (ScanInfo): Information about the scan
|
||||
**kwargs: Additional keyword arguments
|
||||
"""
|
||||
super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs)
|
||||
self._pv_timeout = 1
|
||||
|
||||
def on_init(self) -> None:
|
||||
"""
|
||||
Called when the device is initialized.
|
||||
|
||||
No signals are connected at this point. If you like to
|
||||
set default values on signals, please use on_connected instead.
|
||||
"""
|
||||
|
||||
def on_connected(self) -> None:
|
||||
"""
|
||||
Called after the device is connected and its signals are connected.
|
||||
Default values for signals should be set here.
|
||||
"""
|
||||
|
||||
def on_stage(self) -> DeviceStatus | StatusBase | None:
|
||||
"""
|
||||
Called while staging the device.
|
||||
|
||||
Information about the upcoming scan can be accessed from the scan_info (self.scan_info.msg) object.
|
||||
"""
|
||||
self.collect_mode.set(0).wait()
|
||||
self.preset_real_time.set(0).wait()
|
||||
self.stop_all.put(1)
|
||||
if (
|
||||
self.wait_for_condition(
|
||||
lambda: self.acquiring.get() == FalconAcquiringStatus.DONE, timeout=self._pv_timeout
|
||||
)
|
||||
is False
|
||||
):
|
||||
raise TimeoutError("Timeout on Falcon stage")
|
||||
|
||||
def on_unstage(self) -> DeviceStatus | StatusBase | None:
|
||||
"""Called while unstaging the device."""
|
||||
self.stop_all.put(1)
|
||||
self.erase_all.put(1)
|
||||
if (
|
||||
self.wait_for_condition(
|
||||
lambda: self.acquiring.get() == FalconAcquiringStatus.DONE, timeout=self._pv_timeout
|
||||
)
|
||||
is False
|
||||
):
|
||||
raise TimeoutError("Timeout on Falcon unstage")
|
||||
|
||||
def on_pre_scan(self) -> DeviceStatus | StatusBase | None:
|
||||
"""Called right before the scan starts on all devices automatically."""
|
||||
|
||||
def on_trigger(self) -> DeviceStatus | StatusBase | None:
|
||||
"""Called when the device is triggered."""
|
||||
|
||||
def on_complete(self) -> DeviceStatus | StatusBase | None:
|
||||
"""Called to inquire if a device has completed a scans."""
|
||||
|
||||
def on_kickoff(self) -> DeviceStatus | StatusBase | None:
|
||||
"""Called to kickoff a device for a fly scan. Has to be called explicitly."""
|
||||
|
||||
def on_stop(self) -> None:
|
||||
"""Called when the device is stopped."""
|
||||
self.stop_all.put(1)
|
||||
|
||||
def _stop_erase_and_wait_for_acquiring(self) -> DeviceStatus:
|
||||
"""Method called from the Trigger card to reset counts on the Falcon"""
|
||||
|
||||
if self.acquiring.get() != FalconAcquiringStatus.DONE:
|
||||
self.stop_all.put(1)
|
||||
|
||||
def _check_acquiriting(*, old_value, value, **kwargs):
|
||||
if old_value == FalconAcquiringStatus.DONE and value == FalconAcquiringStatus.ACQUIRING:
|
||||
return True
|
||||
return False
|
||||
|
||||
status = SubscriptionStatus(self.acquiring, _check_acquiriting)
|
||||
|
||||
logger.info("Triggering Falcon")
|
||||
self.erase_start.put(1)
|
||||
return status
|
||||
139
superxas_bec/devices/trigger.py
Normal file
139
superxas_bec/devices/trigger.py
Normal file
@@ -0,0 +1,139 @@
|
||||
"""SuperXAS Trigger Device"""
|
||||
|
||||
from bec_lib.logger import bec_logger
|
||||
from ophyd import Component as Cpt
|
||||
from ophyd import Device, DeviceStatus, EpicsSignal, EpicsSignalRO, Kind, StatusBase
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
import enum
|
||||
|
||||
from bec_lib.devicemanager import ScanInfo
|
||||
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
|
||||
|
||||
|
||||
class ContinuousSamplingMode(int, enum.Enum):
|
||||
"""Options for start_csmpl signal"""
|
||||
|
||||
OFF = 0
|
||||
ON = 1
|
||||
|
||||
|
||||
class SamplingDone(int, enum.Enum):
|
||||
"""Status of sampling"""
|
||||
|
||||
RUNNING = 0
|
||||
DONE = 1
|
||||
|
||||
|
||||
class TriggerControl(Device):
|
||||
"""Trigger Device Control PVs at X10DA, prefix: X10DA-ES1:"""
|
||||
|
||||
total_cycles = Cpt(
|
||||
EpicsSignal,
|
||||
suffix="TOTAL-CYCLES",
|
||||
kind=Kind.config,
|
||||
doc="Number of cycles (multiplies by 0.2s)",
|
||||
)
|
||||
start_csmpl = Cpt(
|
||||
EpicsSignal, suffix="START-CSMPL", kind=Kind.config, doc="Continous sampling mode on/off"
|
||||
)
|
||||
smpl = Cpt(
|
||||
EpicsSignal, suffix="SMPL", kind=Kind.config, doc="Sampling Trigger if cont mode is off"
|
||||
)
|
||||
smpl_done = Cpt(
|
||||
EpicsSignalRO, suffix="SMPL-DONE", kind=Kind.config, doc="Done status of trigger"
|
||||
)
|
||||
|
||||
|
||||
class Trigger(PSIDeviceBase, TriggerControl):
|
||||
"""Trigger Device of X10DA (SUPERXAS), prefix: X10DA-ES1:"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
name: str,
|
||||
prefix: str = "",
|
||||
scan_info: ScanInfo | None = None,
|
||||
device_manager=None,
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs)
|
||||
self.device_manager = device_manager
|
||||
self._pv_timeout = 1
|
||||
|
||||
########################################
|
||||
# Beamline Specific Implementations #
|
||||
########################################
|
||||
|
||||
def on_init(self) -> None:
|
||||
"""
|
||||
Called when the device is initialized.
|
||||
|
||||
No signals are connected at this point. If you like to
|
||||
set default values on signals, please use on_connected instead.
|
||||
"""
|
||||
|
||||
def on_connected(self) -> None:
|
||||
"""
|
||||
Called after the device is connected and its signals are connected.
|
||||
Default values for signals should be set here.
|
||||
"""
|
||||
|
||||
def on_stage(self) -> DeviceStatus | StatusBase | None:
|
||||
"""
|
||||
Called while staging the device.
|
||||
|
||||
Information about the upcoming scan can be accessed from the scan_info (self.scan_info.msg) object.
|
||||
"""
|
||||
self.start_csmpl.set(ContinuousSamplingMode.OFF).wait(timeout=self._pv_timeout)
|
||||
exp_time = self.scan_info.msg.scan_parameters["exp_time"]
|
||||
if self.scan_info.msg.scan_name != "exafs_scan":
|
||||
self.set_exposure_time(exp_time).wait()
|
||||
|
||||
def on_unstage(self) -> DeviceStatus | StatusBase | None:
|
||||
"""Called while unstaging the device."""
|
||||
status = self.start_csmpl.set(ContinuousSamplingMode.ON)
|
||||
return status
|
||||
|
||||
def on_pre_scan(self) -> DeviceStatus | StatusBase | None:
|
||||
"""Called right before the scan starts on all devices automatically."""
|
||||
|
||||
def on_trigger(self) -> DeviceStatus | StatusBase | None:
|
||||
"""Called when the device is triggered."""
|
||||
falcon = self.device_manager.devices.get("falcon", None)
|
||||
|
||||
if falcon is not None:
|
||||
# pylint: disable=protected-access
|
||||
status = falcon._stop_erase_and_wait_for_acquiring()
|
||||
status.wait()
|
||||
|
||||
started = False
|
||||
|
||||
def _sampling_done():
|
||||
nonlocal started
|
||||
if not started and self.smpl_done.get() == SamplingDone.RUNNING:
|
||||
started = True
|
||||
return False
|
||||
if started and self.smpl_done.get() == SamplingDone.DONE:
|
||||
return True
|
||||
|
||||
return self.smpl_done.get() == SamplingDone.DONE
|
||||
|
||||
self.smpl.put(1)
|
||||
status = self.task_handler.submit_task(_sampling_done, run=True)
|
||||
return status
|
||||
|
||||
def on_complete(self) -> DeviceStatus | StatusBase | None:
|
||||
"""Called to inquire if a device has completed a scans."""
|
||||
|
||||
def on_kickoff(self) -> DeviceStatus | StatusBase | None:
|
||||
"""Called to kickoff a device for a fly scan. Has to be called explicitly."""
|
||||
|
||||
def on_stop(self) -> None:
|
||||
"""Called when the device is stopped."""
|
||||
self.task_handler.shutdown()
|
||||
|
||||
def set_exposure_time(self, value: float) -> DeviceStatus:
|
||||
"""Utility method to set exposure time complying to device logic with cycle of min 0.2s."""
|
||||
cycles = max(int(value * 5), 1)
|
||||
return self.total_cycles.set(cycles)
|
||||
@@ -0,0 +1 @@
|
||||
from .exafs_scan import EXAFSScan
|
||||
131
superxas_bec/scans/exafs_scan.py
Normal file
131
superxas_bec/scans/exafs_scan.py
Normal file
@@ -0,0 +1,131 @@
|
||||
import time
|
||||
|
||||
import numpy as np
|
||||
from bec_lib.device import DeviceBase
|
||||
from bec_lib.logger import bec_logger
|
||||
from bec_server.scan_server.scans import ScanBase
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
|
||||
class EXAFSScan(ScanBase):
|
||||
scan_name = "exafs_scan"
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
edge_energy: float,
|
||||
xas_rel_range: list[float] | np.ndarray[float] | None = None,
|
||||
n_points: list[int] | np.ndarray[int] | None = None,
|
||||
k_step: list[bool] | np.ndarray[bool] | None = None,
|
||||
integ_time: list[float] | np.ndarray[float] | None = None,
|
||||
motor: DeviceBase | None = None,
|
||||
settling_time: float = 0.2,
|
||||
**kwargs,
|
||||
):
|
||||
"""
|
||||
EXAFS Scan of the mono_energy axix
|
||||
|
||||
Args:
|
||||
edge_energy (float) : Adsorption Edge Energy
|
||||
xas_rel_range (list[float] | np.ndarray[int] | None) : Optinoal, relative range for XAS, Length of list must n_points +1
|
||||
n_points (list[int] | np.ndarray[bool] | None) : Optional, number of points per range
|
||||
...#TODO docstring
|
||||
"""
|
||||
self.edge_energy = edge_energy
|
||||
self.xas_rel_range = xas_rel_range
|
||||
self.n_points = n_points
|
||||
self.k_step = k_step
|
||||
self.integ_time = integ_time
|
||||
self.k_step_conversion = 3.81
|
||||
self._check_and_upated_input_arguments()
|
||||
if motor is None:
|
||||
default_motor = "kb_slit_y"
|
||||
motor = default_motor # TODO Remove that motor, put energy of mono
|
||||
self.motor = motor
|
||||
super().__init__(exp_time=0, relative=False, settling_time=settling_time, **kwargs)
|
||||
|
||||
# Check that trigger device is enabled
|
||||
_dev_trigger_name = "trigger"
|
||||
self._dev_trigger: DeviceBase = self.device_manager.devices.get(_dev_trigger_name, None)
|
||||
if self._dev_trigger is None or self._dev_trigger.enabled == False:
|
||||
raise ValueError(
|
||||
f"Trigger device not found or not enabled in devicemanager {self._dev_trigger}"
|
||||
)
|
||||
# update scan parameters
|
||||
self.scan_parameters["edge_energy"] = self.edge_energy
|
||||
self.scan_parameters["xas_rel_range"] = self.xas_rel_range
|
||||
self.scan_parameters["n_points"] = self.n_points
|
||||
self.scan_parameters["k_step"] = self.k_step
|
||||
self.scan_parameters["integ_time"] = self.integ_time
|
||||
# update readout_priority
|
||||
self.readout_priority = {"monitored": [self.motor]}
|
||||
|
||||
def update_scan_motors(self):
|
||||
self.scan_motors = [self.motor]
|
||||
|
||||
def _check_and_upated_input_arguments(self) -> None:
|
||||
"""
|
||||
Input parameters for the EXAFS scan must be of the same length for n_points, k_step, integ_time
|
||||
and xas_rel_range (-1). This methods checks for this condition, and calculates the integration time
|
||||
for each point in the scan. If the input parameters are not provided with the correct length,
|
||||
it raises a ValueError which indicates which parameters are not matching.
|
||||
"""
|
||||
|
||||
if not all(
|
||||
[
|
||||
len(self.n_points) == len(self.k_step),
|
||||
len(self.n_points) == len(self.integ_time),
|
||||
len(self.n_points) == (len(self.xas_rel_range) - 1), # careful -1
|
||||
]
|
||||
):
|
||||
raise ValueError(
|
||||
f"Input parameters must have matching lengths:\n"
|
||||
f"n_points: {len(self.n_points)}, "
|
||||
f"k_step: {len(self.k_step)}, expected: {len(self.n_points)}, "
|
||||
f"integ_time: {len(self.integ_time)}, expected: {len(self.n_points)}, "
|
||||
f"xas_rel_range: {len(self.xas_rel_range)}, expected: {len(self.n_points)-1} "
|
||||
)
|
||||
|
||||
self.integ_time = np.repeat(np.array(self.integ_time), np.array(self.n_points))
|
||||
|
||||
def _set_position_offset(self):
|
||||
"""Do not set offset"""
|
||||
yield None
|
||||
|
||||
def _calculate_positions(self):
|
||||
positions = []
|
||||
for ii, pnts in enumerate(self.n_points):
|
||||
if self.k_step[ii] is False:
|
||||
positions.extend(
|
||||
np.linspace(
|
||||
self.xas_rel_range[ii], self.xas_rel_range[ii + 1], pnts, endpoint=False
|
||||
).tolist()
|
||||
)
|
||||
else:
|
||||
k_start = np.sqrt(self.xas_rel_range[ii] / self.k_step_conversion)
|
||||
k_stop = np.sqrt(self.xas_rel_range[ii + 1] / self.k_step_conversion)
|
||||
k_pos = np.linspace(k_start, k_stop, pnts, endpoint=False)
|
||||
k_pos = k_pos**2 * self.k_step_conversion
|
||||
positions.extend(k_pos.tolist())
|
||||
|
||||
# Create positions array
|
||||
self.positions = np.vstack(positions)
|
||||
# shift by edge energy
|
||||
self.positions = self.positions + self.edge_energy
|
||||
# Convert to keV
|
||||
self.positions = self.positions / 1e3
|
||||
|
||||
def _at_each_point(self, ind=None, pos=None):
|
||||
yield from self._move_scan_motors_and_wait(pos)
|
||||
time.sleep(self.settling_time)
|
||||
trigger_time = self.integ_time[ind]
|
||||
self.stubs.send_rpc_and_wait(self._dev_trigger, "set_exposure_time", trigger_time)
|
||||
|
||||
# Trigger
|
||||
yield from self.stubs.trigger(min_wait=trigger_time)
|
||||
|
||||
# Readout all monitored devices
|
||||
yield from self.stubs.read(group="monitored", point_id=self.point_id)
|
||||
|
||||
# Increase point id
|
||||
self.point_id += 1
|
||||
103
tests/tests_devices/test_devices_falcon.py
Normal file
103
tests/tests_devices/test_devices_falcon.py
Normal file
@@ -0,0 +1,103 @@
|
||||
"""Tests for Falcon device."""
|
||||
|
||||
import threading
|
||||
from unittest import mock
|
||||
|
||||
import ophyd
|
||||
import pytest
|
||||
from ophyd_devices.tests.utils import MockPV, patch_dual_pvs
|
||||
|
||||
from superxas_bec.devices.falcon import FalconAcquiringStatus, FalconSuperXAS
|
||||
|
||||
# pylint: disable=protected-access
|
||||
# pylint: disable=redefined-outer-name
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def falcon():
|
||||
"""Trigger device with mocked EPICS PVs."""
|
||||
name = "falcon"
|
||||
prefix = "X10DA-SITORO:"
|
||||
with mock.patch.object(ophyd, "cl") as mock_cl:
|
||||
mock_cl.get_pv = MockPV
|
||||
mock_cl.thread_class = threading.Thread
|
||||
dev = FalconSuperXAS(name=name, prefix=prefix)
|
||||
patch_dual_pvs(dev)
|
||||
yield dev
|
||||
|
||||
|
||||
def test_devices_falcon(falcon):
|
||||
"""Test init and on_connected methods of Falcon device"""
|
||||
|
||||
assert falcon.prefix == "X10DA-SITORO:"
|
||||
assert falcon.name == "falcon"
|
||||
assert falcon._pv_timeout == 1
|
||||
falcon.on_connected()
|
||||
|
||||
|
||||
def test_devices_falcon_stage(falcon):
|
||||
"""Test on_stage method of Falcon device"""
|
||||
|
||||
falcon.collect_mode.put(1)
|
||||
falcon.preset_real_time.put(1)
|
||||
falcon.stop_all.put(0)
|
||||
falcon.acquiring.put(FalconAcquiringStatus.DONE)
|
||||
# Should resolve with that status
|
||||
falcon.on_stage()
|
||||
assert falcon.collect_mode.get() == 0
|
||||
assert falcon.preset_real_time.get() == 0
|
||||
assert falcon.stop_all.get() == 1
|
||||
# Should timeout
|
||||
falcon.acquiring.put(FalconAcquiringStatus.ACQUIRING)
|
||||
falcon._pv_timeout = 0.1
|
||||
with pytest.raises(TimeoutError):
|
||||
falcon.on_stage()
|
||||
|
||||
|
||||
def test_devices_falcon_unstage(falcon):
|
||||
"""Test on_unstage method of Falcon device"""
|
||||
|
||||
falcon.stop_all.put(0)
|
||||
falcon.erase_all.put(0)
|
||||
falcon.acquiring.put(FalconAcquiringStatus.DONE)
|
||||
# Should resolve with that status
|
||||
falcon.on_unstage()
|
||||
assert falcon.stop_all.get() == 1
|
||||
assert falcon.erase_all.get() == 1
|
||||
# Should timeout
|
||||
falcon.acquiring.put(FalconAcquiringStatus.ACQUIRING)
|
||||
falcon._pv_timeout = 0.1
|
||||
with pytest.raises(TimeoutError):
|
||||
falcon.on_unstage()
|
||||
|
||||
|
||||
def test_devices_falcon_stop(falcon):
|
||||
"""Test on_stop method of Falcon device"""
|
||||
assert falcon.stopped is False
|
||||
falcon.stop_all.put(0)
|
||||
falcon.stop()
|
||||
assert falcon.stopped is True
|
||||
assert falcon.stop_all.get() == 1
|
||||
|
||||
|
||||
def test_devices_falcon_stop_erase_and_wait_for_acquiring(falcon):
|
||||
"""
|
||||
Test _stop_erase_and_wait_for_acquiring method of Falcon device.
|
||||
|
||||
This method is called by the trigger card when the Falcon needs to be reset, and
|
||||
placed in a state where it can receive a trigger again
|
||||
"""
|
||||
# Set initial values to different states
|
||||
falcon.stop_all.put(0)
|
||||
falcon.erase_start.put(0)
|
||||
|
||||
falcon.acquiring.put(FalconAcquiringStatus.ACQUIRING)
|
||||
# If falcon status is acquiring, it should call stop_all
|
||||
status = falcon._stop_erase_and_wait_for_acquiring()
|
||||
assert falcon.stop_all.get() == 1
|
||||
assert falcon.erase_start.get() == 1
|
||||
# The status resolved once it sees acquiring change from DONE to ACQUIRING
|
||||
assert status.done is False
|
||||
falcon.acquiring.put(FalconAcquiringStatus.DONE)
|
||||
falcon.acquiring.set(FalconAcquiringStatus.ACQUIRING).wait()
|
||||
assert status.done is True
|
||||
106
tests/tests_devices/test_devices_trigger.py
Normal file
106
tests/tests_devices/test_devices_trigger.py
Normal file
@@ -0,0 +1,106 @@
|
||||
"""Tests for Trigger device."""
|
||||
|
||||
import threading
|
||||
from unittest import mock
|
||||
|
||||
import ophyd
|
||||
import pytest
|
||||
from bec_server.device_server.tests.utils import DMMock
|
||||
from ophyd import DeviceStatus
|
||||
from ophyd_devices.tests.utils import MockPV, patch_dual_pvs
|
||||
|
||||
from superxas_bec.devices.trigger import ContinuousSamplingMode, Trigger
|
||||
|
||||
# pylint: disable=protected-access
|
||||
# pylint: disable=redefined-outer-name
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def trigger():
|
||||
"""Trigger device with mocked EPICS PVs."""
|
||||
name = "trigger"
|
||||
prefix = "X10DA-ES1:"
|
||||
with mock.patch.object(ophyd, "cl") as mock_cl:
|
||||
mock_cl.get_pv = MockPV
|
||||
mock_cl.thread_class = threading.Thread
|
||||
dev = Trigger(name=name, prefix=prefix, device_manager=DMMock())
|
||||
patch_dual_pvs(dev)
|
||||
yield dev
|
||||
|
||||
|
||||
@pytest.mark.parametrize(["exp_time", "cycles"], [(0.1, 1), (0.5, 2), (2, 10)])
|
||||
def test_devices_trigger_stage_core_scans(trigger, exp_time, cycles):
|
||||
"""Test on_connected method of Trigger device.
|
||||
|
||||
The pytest.mark.parametrize decorator is used to run the test for each parameter in the list.
|
||||
"""
|
||||
assert trigger.prefix == "X10DA-ES1:"
|
||||
assert trigger.name == "trigger"
|
||||
assert trigger._pv_timeout == 1
|
||||
#
|
||||
trigger.on_connected()
|
||||
trigger._pv_timeout = 0.2
|
||||
trigger.start_csmpl.put(ContinuousSamplingMode.ON)
|
||||
assert trigger.start_csmpl.get() == ContinuousSamplingMode.ON
|
||||
|
||||
# Set scan_info information for scan
|
||||
trigger.scan_info.msg.scan_name = "step_scan"
|
||||
trigger.scan_info.msg.scan_parameters["exp_time"] = exp_time
|
||||
|
||||
# On stage should set exposure time
|
||||
status = trigger.stage()
|
||||
if isinstance(status, DeviceStatus):
|
||||
status.wait()
|
||||
assert trigger.start_csmpl.get() == ContinuousSamplingMode.OFF
|
||||
# cycles will be multiple of exp_time/0.2 as int, minimum 1.
|
||||
assert trigger.total_cycles.get() == cycles
|
||||
|
||||
|
||||
def test_devices_trigger_unstage(trigger):
|
||||
"""
|
||||
Test on_unstage method of Trigger device.
|
||||
|
||||
This should put start_csmpl to ON.
|
||||
"""
|
||||
trigger.start_csmpl.put(ContinuousSamplingMode.OFF)
|
||||
assert trigger.start_csmpl.get() == ContinuousSamplingMode.OFF
|
||||
status = trigger.unstage()
|
||||
status.wait()
|
||||
assert trigger.start_csmpl.get() == ContinuousSamplingMode.ON
|
||||
|
||||
|
||||
def test_devices_trigger_stop(trigger):
|
||||
"""
|
||||
Test on_stop method of Trigger device.
|
||||
|
||||
This should stop the task_handler.
|
||||
"""
|
||||
assert trigger.stopped is False
|
||||
with mock.patch.object(trigger, "task_handler") as mock_handler:
|
||||
trigger.stop()
|
||||
mock_handler.shutdown.assert_called_once()
|
||||
assert trigger.stopped is True
|
||||
|
||||
|
||||
def test_devices_trigger_trigger(trigger):
|
||||
"""Test trigger method of Trigger device."""
|
||||
# TODO we should use the ScanStatusMessage to update scan_info here
|
||||
falcon = mock.MagicMock()
|
||||
falcon.name.return_value = "falcon"
|
||||
status = DeviceStatus(device=falcon)
|
||||
status.set_finished()
|
||||
falcon._stop_erase_and_wait_for_acquiring.return_value = status
|
||||
trigger.device_manager.devices["falcon"] = falcon
|
||||
|
||||
trigger_status = DeviceStatus(device=trigger)
|
||||
trigger_status.set_finished()
|
||||
with mock.patch.object(
|
||||
trigger.task_handler, "submit_task", return_value=trigger_status
|
||||
) as mock_submit:
|
||||
status = trigger.trigger()
|
||||
assert falcon._stop_erase_and_wait_for_acquiring.call_count == 1
|
||||
assert trigger.smpl.get() == 1 # smpl called with 1
|
||||
# TODO check that the task_handler is called with the correct function
|
||||
# This is currently not easily testable
|
||||
assert mock_submit.call_count == 1
|
||||
assert trigger_status == status
|
||||
Reference in New Issue
Block a user