feat: add falcon integration

This commit is contained in:
gac-x10da
2025-03-20 16:39:00 +01:00
committed by appel_c
parent 6992cd3aeb
commit 345807f761

View File

@@ -0,0 +1,179 @@
from ophyd_devices.devices.dxp import Falcon, EpicsMCARecord, EpicsDXPFalcon
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from ophyd import DeviceStatus, StatusBase, EpicsSignalRO, Kind, Signal, Component as Cpt
from ophyd.device import DynamicDeviceComponent as DCpt
from ophyd.mca import add_rois
from ophyd.status import SubscriptionStatus
from bec_lib.devicemanager import ScanInfo
import enum
import math
import numpy as np
from bec_lib.logger import bec_logger
logger = bec_logger.logger
class FalconAcquiringStatus(int, enum.Enum):
""" Status of Falcon"""
DONE = 0
ACQUIRING = 1
class DeadTimeCorrectedCounts(Signal):
def __init__(self, name:str, channel:int, **kwargs):
super().__init__(name=name, **kwargs)
self._channel = channel
self._dead_time = 1.182e-7
def get(self) -> float:
dxp:EpicsDXPFalconSuperXAS = getattr(self.parent, f"dxp{self._channel}")
mca:EpicsMCARecordSuperXAS = getattr(self.parent, f"mca{self._channel}")
icr = dxp.input_count_rate.get()
ocr = dxp.output_count_rate.get()
roi = mca.rois.roi0.count.get()
ert = mca.elapsed_real_time.get()
print(icr,ocr,roi,ert)
if icr == 0 or ocr ==0:
return 0
# Check that relative change is large enough
test = 1e9
test_icr = icr
n = 0
while test > self._dead_time and n < 30:
try:
true_icr = icr*np.exp(test_icr * self._dead_time)
test = (true_icr - test_icr) / test_icr
test_icr = true_icr
n +=1
except Exception as e:
logger.info(f"Error in computation of signal {self.name}")
return 0
# Return corrected roi counts
cor_roi_cnts = 0
if ocr *ert != 0:
cor_roi_cnts = roi * true_icr / (ocr * ert)
return cor_roi_cnts
class EpicsDXPFalconSuperXAS(EpicsDXPFalcon):
_default_read_attrs = ['input_count_rate', 'output_count_rate']
input_count_rate = Cpt(EpicsSignalRO, "InputCountRate", kind=Kind.normal, auto_monitor=True)
output_count_rate = Cpt(EpicsSignalRO, "OutputCountRate", kind=Kind.normal, auto_monitor=True)
class EpicsMCARecordSuperXAS(EpicsMCARecord):
_default_read_attrs = ['rois']
elapsed_real_time = Cpt(EpicsSignalRO, ".ERTM", kind=Kind.normal, auto_monitor=True)
rois = DCpt(add_rois(range(0, 3), kind=Kind.normal), kind=Kind.normal)
# Consider rewriting add_rois from ophyd.mca what is normal and what is config
class FalconControl(Falcon):
""" Falcon Control class at SuperXAS. prefix: 'X10DA-SITORO:'"""
_default_read_attrs = ['mca1', 'dxp1', "dead_time_cor_cnts1"]
# DXP parameters
dxp1 = Cpt(EpicsDXPFalconSuperXAS, "dxp1:")
# dxp2 = Cpt(EpicsDXPFalconSuperXAS, "dxp2:")
# dxp3 = Cpt(EpicsDXPFalconSuperXAS, "dxp3:")
# dxp4 = Cpt(EpicsDXPFalconSuperXAS, "dxp4:")
# dxp5 = Cpt(EpicsDXPFalconSuperXAS, "dxp5:")
# MCA record with spectrum data
mca1 = Cpt(EpicsMCARecordSuperXAS, "mca1")
# mca2 = Cpt(EpicsMCARecord, "mca2")
# mca3 = Cpt(EpicsMCARecord, "mca3")
# mca4 = Cpt(EpicsMCARecord, "mca4")
# mca5 = Cpt(EpicsMCARecord, "mca5")
#Norm Signal
dead_time_cor_cnts1 = Cpt(DeadTimeCorrectedCounts, name='dead_time_cor_cnts', channel=1, kind=Kind.hinted)
# dead_time_cor_cnts2 = Cpt(DeadTimeCorrectedCounts, name='dead_time_cor_cnts', channel=2, kind=Kind.normal)
# dead_time_cor_cnts3 = Cpt(DeadTimeCorrectedCounts, name='dead_time_cor_cnts', channel=3, kind=Kind.normal)
# dead_time_cor_cnts4 = Cpt(DeadTimeCorrectedCounts, name='dead_time_cor_cnts', channel=4, kind=Kind.normal)
# dead_time_cor_cnts5 = Cpt(DeadTimeCorrectedCounts, name='dead_time_cor_cnts', channel=5, kind=Kind.normal)
class FalconSuperXAS(PSIDeviceBase, FalconControl):
""" Falcon implementierung at SuperXAS. prefix: 'X10DA-SITORO:'"""
########################################
# Beamline Specific Implementations #
########################################
def __init__(self, name: str, prefix:str='',scan_info: ScanInfo | None = None, **kwargs):
super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs)
self._pv_timeout = 1
def on_init(self) -> None:
"""
Called when the device is initialized.
No signals are connected at this point. If you like to
set default values on signals, please use on_connected instead.
"""
def on_connected(self) -> None:
"""
Called after the device is connected and its signals are connected.
Default values for signals should be set here.
"""
def on_stage(self) -> DeviceStatus | StatusBase | None:
"""
Called while staging the device.
Information about the upcoming scan can be accessed from the scan_info (self.scan_info.msg) object.
"""
self.collect_mode.set(0).wait()
self.preset_real_time.set(0).wait()
self.stop_all.put(1)
self.wait_for_condition(lambda: self.acquiring.get() == FalconAcquiringStatus.DONE, timeout=self._pv_timeout)
def on_unstage(self) -> DeviceStatus | StatusBase | None:
"""Called while unstaging the device."""
self.stop_all.put(1)
self.erase_all.put(1)
self.wait_for_condition(lambda: self.acquiring.get() == FalconAcquiringStatus.DONE, timeout=self._pv_timeout)
def on_pre_scan(self) -> DeviceStatus | StatusBase | None:
"""Called right before the scan starts on all devices automatically."""
def on_trigger(self) -> DeviceStatus | StatusBase | None:
"""Called when the device is triggered."""
def on_complete(self) -> DeviceStatus | StatusBase | None:
"""Called to inquire if a device has completed a scans."""
def on_kickoff(self) -> DeviceStatus | StatusBase | None:
"""Called to kickoff a device for a fly scan. Has to be called explicitly."""
def on_stop(self) -> None:
"""Called when the device is stopped."""
self.stop_all.put(1)
def _stop_erase_and_wait_for_acquiring(self) -> DeviceStatus:
"""Method called from the Trigger card to reset counts on the Falcon"""
if self.acquiring.get() != FalconAcquiringStatus.DONE:
self.stop_all.put(1)
def _check_acquiriting(*, old_value, value, **kwargs):
if old_value == FalconAcquiringStatus.DONE and value == FalconAcquiringStatus.ACQUIRING:
return True
return False
status = SubscriptionStatus(self.acquiring, _check_acquiriting)
logger.info("Triggering Falcon")
self.erase_start.put(1)
return status