From 345807f761dc019f712274b39ce93afe442e03ba Mon Sep 17 00:00:00 2001 From: gac-x10da Date: Thu, 20 Mar 2025 16:39:00 +0100 Subject: [PATCH] feat: add falcon integration --- superxas_bec/devices/falcon.py | 179 +++++++++++++++++++++++++++++++++ 1 file changed, 179 insertions(+) create mode 100644 superxas_bec/devices/falcon.py diff --git a/superxas_bec/devices/falcon.py b/superxas_bec/devices/falcon.py new file mode 100644 index 0000000..66a8fb7 --- /dev/null +++ b/superxas_bec/devices/falcon.py @@ -0,0 +1,179 @@ +from ophyd_devices.devices.dxp import Falcon, EpicsMCARecord, EpicsDXPFalcon +from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase +from ophyd import DeviceStatus, StatusBase, EpicsSignalRO, Kind, Signal, Component as Cpt +from ophyd.device import DynamicDeviceComponent as DCpt +from ophyd.mca import add_rois +from ophyd.status import SubscriptionStatus +from bec_lib.devicemanager import ScanInfo +import enum +import math +import numpy as np + +from bec_lib.logger import bec_logger + +logger = bec_logger.logger + +class FalconAcquiringStatus(int, enum.Enum): + """ Status of Falcon""" + DONE = 0 + ACQUIRING = 1 + + +class DeadTimeCorrectedCounts(Signal): + + def __init__(self, name:str, channel:int, **kwargs): + super().__init__(name=name, **kwargs) + self._channel = channel + self._dead_time = 1.182e-7 + + def get(self) -> float: + dxp:EpicsDXPFalconSuperXAS = getattr(self.parent, f"dxp{self._channel}") + mca:EpicsMCARecordSuperXAS = getattr(self.parent, f"mca{self._channel}") + + icr = dxp.input_count_rate.get() + ocr = dxp.output_count_rate.get() + roi = mca.rois.roi0.count.get() + ert = mca.elapsed_real_time.get() + print(icr,ocr,roi,ert) + + if icr == 0 or ocr ==0: + return 0 + + # Check that relative change is large enough + test = 1e9 + test_icr = icr + n = 0 + while test > self._dead_time and n < 30: + try: + true_icr = icr*np.exp(test_icr * self._dead_time) + test = (true_icr - test_icr) / test_icr + test_icr = true_icr + n +=1 + except Exception as e: + logger.info(f"Error in computation of signal {self.name}") + return 0 + + # Return corrected roi counts + cor_roi_cnts = 0 + if ocr *ert != 0: + cor_roi_cnts = roi * true_icr / (ocr * ert) + return cor_roi_cnts + + + +class EpicsDXPFalconSuperXAS(EpicsDXPFalcon): + + _default_read_attrs = ['input_count_rate', 'output_count_rate'] + + input_count_rate = Cpt(EpicsSignalRO, "InputCountRate", kind=Kind.normal, auto_monitor=True) + output_count_rate = Cpt(EpicsSignalRO, "OutputCountRate", kind=Kind.normal, auto_monitor=True) + +class EpicsMCARecordSuperXAS(EpicsMCARecord): + + _default_read_attrs = ['rois'] + + elapsed_real_time = Cpt(EpicsSignalRO, ".ERTM", kind=Kind.normal, auto_monitor=True) + rois = DCpt(add_rois(range(0, 3), kind=Kind.normal), kind=Kind.normal) + # Consider rewriting add_rois from ophyd.mca what is normal and what is config + + +class FalconControl(Falcon): + """ Falcon Control class at SuperXAS. prefix: 'X10DA-SITORO:'""" + + _default_read_attrs = ['mca1', 'dxp1', "dead_time_cor_cnts1"] + + # DXP parameters + dxp1 = Cpt(EpicsDXPFalconSuperXAS, "dxp1:") + # dxp2 = Cpt(EpicsDXPFalconSuperXAS, "dxp2:") + # dxp3 = Cpt(EpicsDXPFalconSuperXAS, "dxp3:") + # dxp4 = Cpt(EpicsDXPFalconSuperXAS, "dxp4:") + # dxp5 = Cpt(EpicsDXPFalconSuperXAS, "dxp5:") + + + # MCA record with spectrum data + mca1 = Cpt(EpicsMCARecordSuperXAS, "mca1") + # mca2 = Cpt(EpicsMCARecord, "mca2") + # mca3 = Cpt(EpicsMCARecord, "mca3") + # mca4 = Cpt(EpicsMCARecord, "mca4") + # mca5 = Cpt(EpicsMCARecord, "mca5") + + #Norm Signal + dead_time_cor_cnts1 = Cpt(DeadTimeCorrectedCounts, name='dead_time_cor_cnts', channel=1, kind=Kind.hinted) + # dead_time_cor_cnts2 = Cpt(DeadTimeCorrectedCounts, name='dead_time_cor_cnts', channel=2, kind=Kind.normal) + # dead_time_cor_cnts3 = Cpt(DeadTimeCorrectedCounts, name='dead_time_cor_cnts', channel=3, kind=Kind.normal) + # dead_time_cor_cnts4 = Cpt(DeadTimeCorrectedCounts, name='dead_time_cor_cnts', channel=4, kind=Kind.normal) + # dead_time_cor_cnts5 = Cpt(DeadTimeCorrectedCounts, name='dead_time_cor_cnts', channel=5, kind=Kind.normal) + +class FalconSuperXAS(PSIDeviceBase, FalconControl): + """ Falcon implementierung at SuperXAS. prefix: 'X10DA-SITORO:'""" + + ######################################## + # Beamline Specific Implementations # + ######################################## + + def __init__(self, name: str, prefix:str='',scan_info: ScanInfo | None = None, **kwargs): + super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs) + self._pv_timeout = 1 + + def on_init(self) -> None: + """ + Called when the device is initialized. + + No signals are connected at this point. If you like to + set default values on signals, please use on_connected instead. + """ + + def on_connected(self) -> None: + """ + Called after the device is connected and its signals are connected. + Default values for signals should be set here. + """ + + def on_stage(self) -> DeviceStatus | StatusBase | None: + """ + Called while staging the device. + + Information about the upcoming scan can be accessed from the scan_info (self.scan_info.msg) object. + """ + self.collect_mode.set(0).wait() + self.preset_real_time.set(0).wait() + self.stop_all.put(1) + self.wait_for_condition(lambda: self.acquiring.get() == FalconAcquiringStatus.DONE, timeout=self._pv_timeout) + + def on_unstage(self) -> DeviceStatus | StatusBase | None: + """Called while unstaging the device.""" + self.stop_all.put(1) + self.erase_all.put(1) + self.wait_for_condition(lambda: self.acquiring.get() == FalconAcquiringStatus.DONE, timeout=self._pv_timeout) + + def on_pre_scan(self) -> DeviceStatus | StatusBase | None: + """Called right before the scan starts on all devices automatically.""" + + def on_trigger(self) -> DeviceStatus | StatusBase | None: + """Called when the device is triggered.""" + + def on_complete(self) -> DeviceStatus | StatusBase | None: + """Called to inquire if a device has completed a scans.""" + + def on_kickoff(self) -> DeviceStatus | StatusBase | None: + """Called to kickoff a device for a fly scan. Has to be called explicitly.""" + + def on_stop(self) -> None: + """Called when the device is stopped.""" + self.stop_all.put(1) + + def _stop_erase_and_wait_for_acquiring(self) -> DeviceStatus: + """Method called from the Trigger card to reset counts on the Falcon""" + + if self.acquiring.get() != FalconAcquiringStatus.DONE: + self.stop_all.put(1) + + def _check_acquiriting(*, old_value, value, **kwargs): + if old_value == FalconAcquiringStatus.DONE and value == FalconAcquiringStatus.ACQUIRING: + return True + return False + status = SubscriptionStatus(self.acquiring, _check_acquiriting) + + logger.info("Triggering Falcon") + self.erase_start.put(1) + return status \ No newline at end of file