refactor(falcon): cleanup falcon integration, remove extra classes, move scripts.

This commit is contained in:
2025-06-26 14:03:51 +02:00
parent f70445f783
commit a650759ae7
11 changed files with 380 additions and 891 deletions

View File

@@ -0,0 +1,50 @@
import time
from ophyd_devices import CompareStatus
from superxas_bec.devices.falcon_ad import FalconAcquiringStatus, FalconAD
from superxas_bec.devices.trigger import SamplingDone, Trigger
if __name__ == "__main__":
print("Initializing Falcon...")
time_started = time.time()
falcon = FalconAD(name="test_device", prefix="X10DA-SITORO:", scan_info=None)
trigger = Trigger(name="trigger_device", prefix="X10DA-ES1:", scan_info=None)
falcon.wait_for_connection(timeout=50, all_signals=True)
trigger.wait_for_connection(timeout=50, all_signals=True)
print(f"Device initialized in {time.time() - time_started:.2f} seconds.")
if falcon.acquiring.get != FalconAcquiringStatus.DONE:
status = CompareStatus(falcon.acquiring, FalconAcquiringStatus.DONE)
falcon.stop_all.put(1)
status.wait(timeout=5)
# Test loop
for i in range(500):
start_time = time.time()
status = CompareStatus(falcon.acquiring, FalconAcquiringStatus.ACQUIRING)
falcon.erase_start.put(1)
status.wait(timeout=5)
status2 = CompareStatus(trigger.smpl_done, SamplingDone.DONE)
trigger.smpl.put(1)
status2.wait(timeout=5)
status3 = CompareStatus(falcon.acquiring, FalconAcquiringStatus.DONE)
falcon.stop_all.put(1)
status3.wait(timeout=5)
time.sleep(0.1)
icr = falcon.dxp1.input_count_rate.get()
ocr = falcon.dxp1.output_count_rate.get()
roi = falcon.mca1.rois.count.get()
ert = falcon.mca1.elapsed_real_time.get()
dead_time_corrected_signal = falcon.dead_time_corrected_signal.get()
print(
f"time={time.time() - start_time:.4f}", icr, ocr, roi, ert, dead_time_corrected_signal
)

50
bin/test_falcon.py Normal file
View File

@@ -0,0 +1,50 @@
import time
from ophyd_devices import CompareStatus
from superxas_bec.devices.falcon import FalconAcquiringStatus, FalconSuperXAS
from superxas_bec.devices.trigger import SamplingDone, Trigger
if __name__ == "__main__":
print("Initializing Falcon...")
time_started = time.time()
falcon = FalconSuperXAS(name="test_device", prefix="X10DA-SITORO:", scan_info=None)
trigger = Trigger(name="trigger_device", prefix="X10DA-ES1:", scan_info=None)
falcon.wait_for_connection(timeout=50, all_signals=True)
trigger.wait_for_connection(timeout=50, all_signals=True)
print(f"Device initialized in {time.time() - time_started:.2f} seconds.")
if falcon.acquiring.get != FalconAcquiringStatus.DONE:
status = CompareStatus(falcon.acquiring, FalconAcquiringStatus.DONE)
falcon.stop_all.put(1)
status.wait(timeout=5)
# Test loop
for i in range(500):
start_time = time.time()
status = CompareStatus(falcon.acquiring, FalconAcquiringStatus.ACQUIRING)
falcon.erase_start.put(1)
status.wait(timeout=5)
status2 = CompareStatus(trigger.smpl_done, SamplingDone.DONE)
trigger.smpl.put(1)
status2.wait(timeout=5)
status3 = CompareStatus(falcon.acquiring, FalconAcquiringStatus.DONE)
falcon.stop_all.put(1)
status3.wait(timeout=5)
time.sleep(0.1)
icr = falcon.icr.get()
ocr = falcon.ocr.get()
roi = falcon.roi.get()
ert = falcon.ert.get()
dead_time_corrected_signal = falcon.dead_time_corrected_signal.get()
print(
f"time={time.time() - start_time:.4f}", icr, ocr, roi, ert, dead_time_corrected_signal
)

View File

@@ -1,18 +1,14 @@
"""FALCON device implementation for SuperXAS"""
"""Module for Falcon detector at SuperXAS."""
import enum
import time
import queue
import threading
import numpy as np
from bec_lib.devicemanager import ScanInfo
from bec_lib.endpoints import MessageEndpoints
from bec_lib.logger import bec_logger
from bec_lib.messages import DeviceMessage
from ophyd import Component as Cpt
from ophyd import DeviceStatus, Kind, Signal, Staged, StatusBase
from ophyd.status import SubscriptionStatus
from ophyd_devices import CompareStatus, TransitionStatus
from ophyd_devices.devices.dxp import EpicsDXPFalcon, EpicsMCARecord, Falcon
from ophyd import Device, EpicsSignal, EpicsSignalRO, Kind, SignalRO
from ophyd_devices import CompareStatus, DeviceStatus, PreviewSignal, StatusBase
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
logger = bec_logger.logger
@@ -25,110 +21,137 @@ class FalconAcquiringStatus(int, enum.Enum):
ACQUIRING = 1
def compute_dead_time_corrected_signal(icr: float, ocr: float, roi: float, ert: float):
_dead_time = 1.182e-7
if icr == 0 or ocr == 0:
return 0
class DeadTimeCorrectedSignal(SignalRO):
"""Signal for dead time corrected counts."""
# Check that relative change is large enough
test = 1e9
test_icr = icr
n = 0
while test > _dead_time and n < 30:
try:
true_icr = icr * np.exp(test_icr * _dead_time)
test = (true_icr - test_icr) / test_icr
test_icr = true_icr
n += 1
except Exception as e: # pylint: disable=broad-except
logger.info(f"Error in computation of deadtime corrected signal, error: {e}")
def __init__(self, *args, parent: Device | None = None, **kwargs):
super().__init__(*args, **kwargs)
self.parent = parent
self._dead_time = 1.182e-7 # Dead time in seconds
def get(self, **kwargs) -> float | None:
icr = self.parent.icr.get()
ocr = self.parent.ocr.get()
roi = self.parent.roi.get()
ert = self.parent.ert.get()
return self.compute_deadtime_corrected_signal(icr, ocr, roi, ert)
def compute_deadtime_corrected_signal(
self, icr: float, ocr: float, roi: float, ert: float
) -> float:
"""Method to compute dead time corrected signal."""
if icr == 0 or ocr == 0:
return 0
# Return corrected roi counts
cor_roi_cnts = 0
if ocr * ert != 0:
cor_roi_cnts = roi * true_icr / (ocr * ert)
return cor_roi_cnts
# Check that relative change is large enough
test = 1e9
test_icr = icr
n = 0
while test > self._dead_time and n < 30:
try:
true_icr = icr * np.exp(test_icr * self._dead_time)
test = (true_icr - test_icr) / test_icr
test_icr = true_icr
n += 1
except Exception as e: # pylint: disable=broad-except
logger.info(f"Error in computation of deadtime corrected signal, error: {e}")
return 0
# Return corrected roi counts
cor_roi_cnts = 0
if ocr * ert != 0:
cor_roi_cnts = roi * true_icr / (ocr * ert)
return cor_roi_cnts
class FalconControl(Falcon):
"""Falcon Control class at SuperXAS. prefix: 'X10DA-SITORO:'"""
class FalconControl(Device):
"""Falcon Control class for SuperXAS. prefix: 'X10DA-SITORO:'"""
# _default_read_attrs = Falcon._default_read_attrs + (
# "dxp1",
# # # "dxp2",
# "mca1",
# # # "mca2",
# "dead_time_cor_cnts1",
# # # "dead_time_cor_cnts2",
# )
# _default_configuration_attrs = Falcon._default_configuration_attrs + (
# "dxp1",
# # "dxp2",
# "mca1",
# # "mca2",
# "dead_time_cor_cnts1",
# # "dead_time_cor_cnts2",
# )
# PVs for Falcon control
erase_start: EpicsSignal = Cpt(
EpicsSignal, "EraseStart", kind=Kind.omitted, doc="XMAP start signal"
)
stop_all: EpicsSignal = Cpt(EpicsSignal, "StopAll", kind=Kind.omitted, doc="XMAP stop signal")
acquiring: EpicsSignalRO = Cpt(
EpicsSignalRO,
"Acquiring",
kind=Kind.omitted,
auto_monitor=True,
doc="XMAP acquiring signal",
)
# PVs for Signals, auto_monitors are active here
icr: EpicsSignalRO = Cpt(
EpicsSignalRO,
"dxp1:InputCountRate",
kind=Kind.normal,
auto_monitor=True,
doc="XMAP input count rate",
)
ocr: EpicsSignalRO = Cpt(
EpicsSignalRO,
"dxp1:OutputCountRate",
kind=Kind.normal,
auto_monitor=True,
doc="XMAP output count rate",
)
ert: EpicsSignalRO = Cpt(
EpicsSignalRO,
"mca1.ERTM",
kind=Kind.normal,
auto_monitor=True,
doc="XMAP elapsed real time",
)
roi: EpicsSignalRO = Cpt(
EpicsSignalRO, "mca1.R0", kind=Kind.normal, auto_monitor=True, doc="XMAP ROI signal"
)
label: EpicsSignalRO = Cpt(EpicsSignalRO, "mca1.R0NM", kind=Kind.config, doc="XMAP ROI label")
spectrum_val: EpicsSignalRO = Cpt(
EpicsSignalRO, "mca1.VAL", kind=Kind.omitted, doc="XMAP spectrum signal"
)
# DXP parameters
dxp1 = Cpt(EpicsDXPFalcon, "dxp1:")
# dxp2 = Cpt(EpicsDXPFalcon, "dxp2:")
# Configuration attributes
collect_mode: EpicsSignal = Cpt(EpicsSignal, "CollectMode", doc="Collect mode signal")
preset_real_time: EpicsSignal = Cpt(
EpicsSignal, "PresetRealTime", doc="Preset real time signal"
)
# MCA record with spectrum data
mca1 = Cpt(EpicsMCARecord, "mca1")
# mca2 = Cpt(EpicsMCARecord, "mca2")
# Preview Signal for Falcon detector
spectrum: PreviewSignal = Cpt(
PreviewSignal,
name="spectrum",
ndim=1,
kind=Kind.omitted,
auto_monitor=True,
doc="Preview signal for Falcon detector spectrum",
)
# dead_time_cor_cnts2 = Cpt(DeadTimeCorrectedCounts, name='dead_time_cor_cnts', channel=2, kind=Kind.normal)
# Computed signal for dead time corrected counts
dead_cor_roi0_count = Cpt(
DeadTimeCorrectedSignal,
name="dead_cor_roi0_count",
kind=Kind.hinted,
doc="Dead time corrected ROI 0 count",
)
class FalconSuperXAS(PSIDeviceBase, FalconControl):
"""Falcon implementierung at SuperXAS. prefix: 'X10DA-SITORO:'"""
"""Slim Falcon implementation at SuperXAS. prefix: 'X10DA-SITORO:'"""
_default_read_attrs = ("icr", "ocr", "elap_real_time", "roi0_count", "dead_cor_roi0_count")
_default_configuration_attrs = None
preview = Cpt(
Signal, name="preview", kind=Kind.omitted, doc="Preview signal for Falcon detector"
)
icr = Cpt(Signal, name="icr", kind=Kind.normal)
ocr = Cpt(Signal, name="icr", kind=Kind.normal)
elap_real_time = Cpt(Signal, name="icr", kind=Kind.normal)
roi0_count = Cpt(Signal, name="icr", kind=Kind.normal)
dead_cor_roi0_count = Cpt(
Signal,
name="dead_cor_roi0_count",
doc="Async dead time corrected ROI 0 count signal",
kind=Kind.normal,
)
########################################
# Beamline Specific Implementations #
########################################
def __init__(
self,
name: str,
prefix: str = "",
scan_info: ScanInfo | None = None,
device_manager=None,
**kwargs,
):
"""
Initialize Falcon device.
Args:
name (str): Name of the device
prefix (str): Prefix of the device
scan_info (ScanInfo): Information about the scan
**kwargs: Additional keyword arguments
"""
def __init__(self, *, name, prefix="", scan_info=None, device_manager=None, **kwargs):
super().__init__(
name=name, prefix=prefix, scan_info=scan_info, device_manager=device_manager, **kwargs
)
self.device_manager = device_manager
self.mca1.stage_sigs = {}
self._pv_timeout = 5
self._pv_timeout = 5 # seconds
self._dead_time = 1.182e-7 # Dead time in seconds
self.update_queue: queue.Queue = queue.Queue()
self.current_values: dict = {}
self.update_thread: threading.Thread | None = None
self.threading_event = threading.Event() # Event to control the update thread
self.r_lock: threading.RLock = threading.RLock() # Lock for thread safety
#########################################
#### Custom beamline specific methods ###
#########################################
def on_init(self) -> None:
"""
@@ -138,27 +161,34 @@ class FalconSuperXAS(PSIDeviceBase, FalconControl):
set default values on signals, please use on_connected instead.
"""
def on_destroy(self):
"""
Called when the device is destroyed.
This method can be used to clean up resources or stop threads.
"""
if self.update_thread and self.update_thread.is_alive():
self.threading_event.set()
self.update_queue.put((None, None)) # Signal the thread to stop
self.update_thread.join(timeout=5) # Wait for the thread to finish
if self.update_thread.is_alive():
logger.warning(f"Update thread for device {self.name} did not finish in 5s.")
self.update_thread = None
def on_connected(self) -> None:
"""
Called after the device is connected and its signals are connected.
Default values for signals should be set here.
Upon being connected, make sure the Falcon is not acquiring.
"""
def stage(self) -> list[object] | DeviceStatus | StatusBase: # type: ignore
"""Stage the device."""
if self.staged != Staged.no:
return [self]
self.stopped = False
status = self.on_stage() # pylint: disable=assignment-from-no-return
if isinstance(status, StatusBase):
return status
return [self]
status = CompareStatus(self.acquiring, FalconAcquiringStatus.DONE)
self.cancel_on_stop(status)
self.stop_all.put(1)
status.wait(timeout=self._pv_timeout)
def on_stage(self) -> DeviceStatus | StatusBase | None:
"""
Called while staging the device.
Information about the upcoming scan can be accessed from the scan_info (self.scan_info.msg) object.
Information about the upcoming scan can be accessed self.scan_info.msg.
"""
if self.acquiring.get() != FalconAcquiringStatus.DONE:
logger.info(f"Falcon state was {self.acquiring.get()} during stage. Calling stop_all")
@@ -167,15 +197,20 @@ class FalconSuperXAS(PSIDeviceBase, FalconControl):
self.stop_all.put(1)
status.wait(timeout=self._pv_timeout)
self.collect_mode.set(0).wait(timeout=self._pv_timeout)
self.preset_real_time.set(0).wait(timeout=self._pv_timeout)
status_list = []
status_list.append(self.collect_mode.set(0))
status_list.append(self.preset_real_time.set(0))
for status in status_list:
self.cancel_on_stop(status)
for status in status_list:
status.wait(timeout=self._pv_timeout)
def on_unstage(self) -> DeviceStatus | StatusBase | None:
"""Called while unstaging the device."""
if self.acquiring.get() != FalconAcquiringStatus.DONE:
self.stop_all.put(1)
status = CompareStatus(self.acquiring, FalconAcquiringStatus.DONE)
self.cancel_on_stop(status)
self.stop_all.put(1)
return status
def on_pre_scan(self) -> DeviceStatus | StatusBase | None:
@@ -193,38 +228,3 @@ class FalconSuperXAS(PSIDeviceBase, FalconControl):
def on_stop(self) -> None:
"""Called when the device is stopped."""
self.stop_all.put(1)
def send_data(self):
"""
Wait until the Falcon is done acquiring data.
This method blocks until the acquiring status is DONE.
"""
time_started = time.time()
# CompareStatus(self.acquiring, FalconAcquiringStatus.DONE).wait(self._pv_timeout)
logger.info(f"Sending data for {self.name} at {time_started}")
icr = self.dxp1.input_count_rate.get()
ocr = self.dxp1.output_count_rate.get()
roi = self.mca1.rois.count.get()
ert = self.mca1.elapsed_real_time.get()
self.icr.put(icr)
logger.info(f"Data to plot {self.icr.get()}")
self.ocr.put(ocr)
self.elap_real_time.put(ert)
self.roi0_count.put(roi)
self.dead_cor_roi0_count.put(compute_dead_time_corrected_signal(icr, ocr, roi, ert))
# self._send_preview_async()
logger.info(f"Data sent for {self.name} at {time.time()- time_started}")
def _send_preview_async(self) -> None:
metadata = {"async_update": {"type": "add", "max_shape": [None, 3000]}}
data = {self.preview.name: {"value": self.mca1.spectrum.get(), "timestamp": time.time()}}
msg = DeviceMessage(signals=data, metadata=metadata)
self.device_manager.connector.xadd(
MessageEndpoints.device_async_readback(
scan_id=self.scan_info.msg.scan_id, device=self.name
),
msg_dict={"data": msg},
max_size=1000,
expire=900,
)

View File

@@ -0,0 +1,130 @@
"""FALCON device implementation for SuperXAS"""
import enum
from bec_lib.devicemanager import ScanInfo
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd import DeviceStatus, Staged, StatusBase
from ophyd_devices import CompareStatus
from ophyd_devices.devices.dxp import EpicsDXPFalcon, EpicsMCARecord, Falcon
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
logger = bec_logger.logger
class FalconAcquiringStatus(int, enum.Enum):
"""Status of Falcon"""
DONE = 0
ACQUIRING = 1
class FalconADControl(Falcon):
"""Falcon Control class at SuperXAS. prefix: 'X10DA-SITORO:'"""
# DXP parameters
dxp1 = Cpt(EpicsDXPFalcon, "dxp1:")
# MCA record with spectrum data
mca1 = Cpt(EpicsMCARecord, "mca1")
class FalconAD(PSIDeviceBase, FalconADControl):
"""Falcon implementierung at SuperXAS. prefix: 'X10DA-SITORO:'"""
########################################
# Beamline Specific Implementations #
########################################
def __init__(
self,
name: str,
prefix: str = "",
scan_info: ScanInfo | None = None,
device_manager=None,
**kwargs,
):
"""
Initialize Falcon device.
Args:
name (str): Name of the device
prefix (str): Prefix of the device
scan_info (ScanInfo): Information about the scan
**kwargs: Additional keyword arguments
"""
super().__init__(
name=name, prefix=prefix, scan_info=scan_info, device_manager=device_manager, **kwargs
)
self.device_manager = device_manager
self._pv_timeout = 5
def on_init(self) -> None:
"""
Called when the device is initialized.
No signals are connected at this point. If you like to
set default values on signals, please use on_connected instead.
"""
def on_connected(self) -> None:
"""
Called after the device is connected and its signals are connected.
Default values for signals should be set here.
"""
status = CompareStatus(self.acquiring, FalconAcquiringStatus.DONE)
self.cancel_on_stop(status)
self.stop_all.put(1)
status.wait(timeout=self._pv_timeout)
# TODO Skip AD stage command (trigger_value) for now!
def stage(self) -> list[object] | DeviceStatus | StatusBase: # type: ignore
"""Stage the device."""
if self.staged != Staged.no:
return [self]
self.stopped = False
status = self.on_stage() # pylint: disable=assignment-from-no-return
if isinstance(status, StatusBase):
return status
return [self]
def on_stage(self) -> DeviceStatus | StatusBase | None:
"""
Called while staging the device.
Information about the upcoming scan can be accessed from the scan_info (self.scan_info.msg) object.
"""
if self.acquiring.get() != FalconAcquiringStatus.DONE:
logger.info(f"Falcon state was {self.acquiring.get()} during stage. Calling stop_all")
status = CompareStatus(self.acquiring, FalconAcquiringStatus.DONE)
self.cancel_on_stop(status)
self.stop_all.put(1)
status.wait(timeout=self._pv_timeout)
self.collect_mode.set(0).wait(timeout=self._pv_timeout)
self.preset_real_time.set(0).wait(timeout=self._pv_timeout)
def on_unstage(self) -> DeviceStatus | StatusBase | None:
"""Called while unstaging the device."""
if self.acquiring.get() != FalconAcquiringStatus.DONE:
self.stop_all.put(1)
status = CompareStatus(self.acquiring, FalconAcquiringStatus.DONE)
self.cancel_on_stop(status)
return status
def on_pre_scan(self) -> DeviceStatus | StatusBase | None:
"""Called right before the scan starts on all devices automatically."""
def on_trigger(self) -> DeviceStatus | StatusBase | None:
"""Called when the device is triggered."""
def on_complete(self) -> DeviceStatus | StatusBase | None:
"""Called to inquire if a device has completed a scans."""
def on_kickoff(self) -> DeviceStatus | StatusBase | None:
"""Called to kickoff a device for a fly scan. Has to be called explicitly."""
def on_stop(self) -> None:
"""Called when the device is stopped."""
self.stop_all.put(1)

View File

@@ -1,276 +0,0 @@
import enum
import time
from collections import OrderedDict
from typing import TYPE_CHECKING
import numpy as np
from bec_lib.devicemanager import ScanInfo
from bec_lib.endpoints import MessageEndpoints
from bec_lib.logger import bec_logger
from bec_lib.messages import DeviceMessage
from bec_server.device_server.devices.devicemanager import DeviceManagerDS
from ophyd import Component as Cpt
from ophyd import Device
from ophyd import DynamicDeviceComponent as DDC
from ophyd import EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV, Kind, Signal
from ophyd_devices import (
AsyncSignal,
CompareStatus,
DeviceStatus,
PreviewSignal,
StatusBase,
TransitionStatus,
)
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
logger = bec_logger.logger
class FalconAcquiringStatus(int, enum.Enum):
"""Status of Falcon"""
DONE = 0
ACQUIRING = 1
class DXPControl(Device):
"""DXP Control Device for Falcon detector"""
input_count_rate = Cpt(EpicsSignalRO, "InputCountRate", kind=Kind.omitted)
output_count_rate = Cpt(EpicsSignalRO, "OutputCountRate", kind=Kind.omitted)
elapsed_real_time = Cpt(EpicsSignalRO, "ElapsedRealTime", kind=Kind.omitted)
class MCAControl(Device):
"""MCA Control Device for Falcon detector"""
spectrum = Cpt(EpicsSignalRO, ".VAL", kind=Kind.omitted)
roi0_count = Cpt(EpicsSignalRO, ".R0", kind=Kind.omitted)
roi0_label = Cpt(EpicsSignal, ".R0NM", kind=Kind.omitted)
elapsed_real_time = Cpt(EpicsSignalRO, ".ERTM", kind=Kind.omitted)
class DeadTimeCorrectedCounts(Signal):
"""Signal to calculate dead time corrected counts"""
def __init__(self, name: str, channel: int, **kwargs):
"""
Initialize DeadTimeCorrectedCounts signal.
Args:
name (str): Name of the signal
channel (int): Channel number
"""
super().__init__(name=name, **kwargs)
self._channel = channel
self._dead_time = 1.182e-7
# pylint: disable=arguments-differ
def get(self) -> float:
"""Get dead time corrected counts base on signals from dxp and mca of Falcon"""
dxp: DXPControl = getattr(self.parent, f"dxp{self._channel}")
mca: MCAControl = getattr(self.parent, f"mca{self._channel}")
icr = dxp.input_count_rate.get()
ocr = dxp.output_count_rate.get()
roi = mca.roi_count.get()
ert = mca.elapsed_real_time.get()
# print(icr, ocr, roi, ert)
if icr == 0 or ocr == 0:
return 0
# Check that relative change is large enough
test = 1e9
test_icr = icr
n = 0
while test > self._dead_time and n < 30:
try:
true_icr = icr * np.exp(test_icr * self._dead_time)
test = (true_icr - test_icr) / test_icr
test_icr = true_icr
n += 1
except Exception as e: # pylint: disable=broad-except
logger.info(f"Error in computation of signal {self.name}, error: {e}")
return 0
# Return corrected roi counts
cor_roi_cnts = 0
if ocr * ert != 0:
cor_roi_cnts = roi * true_icr / (ocr * ert)
return cor_roi_cnts
class FalconControlDirect(Device):
"""Stripped implementation of Falcon detector for SuperXAS; prefix: 'X10DA-SITORO:'"""
# Acquisition control
erase_all = Cpt(EpicsSignal, "EraseAll", kind=Kind.omitted)
erase_start = Cpt(EpicsSignal, "EraseStart", put_complete=True, kind=Kind.omitted)
start_all = Cpt(EpicsSignal, "StartAll", put_complete=True, kind=Kind.omitted)
stop_all = Cpt(EpicsSignal, "StopAll", kind=Kind.omitted)
collect_mode = Cpt(EpicsSignalWithRBV, "CollectMode", kind=Kind.config)
preset_real_time = Cpt(EpicsSignal, "PresetReal", kind=Kind.config)
acquiring = Cpt(EpicsSignal, "Acquiring", kind=Kind.omitted, auto_monitor=True)
# DXP parameters
dxp1 = Cpt(DXPControl, "dxp1:", kind=Kind.omitted)
# dxp2 = Cpt(EpicsDXPFalcon, "dxp2:")
# MCA record with spectrum data
mca1 = Cpt(MCAControl, "mca1", kind=Kind.omitted)
# mca2 = Cpt(EpicsMCARecord, "mca2")
# Norm Signal
dead_time_cor_cnts1 = Cpt(
DeadTimeCorrectedCounts, name="dead_time_cor_cnts", channel=1, kind=Kind.omitted
)
# dead_time_cor_cnts2 = Cpt(DeadTimeCorrectedCounts, name='dead_time_cor_cnts', channel=2, kind=Kind.normal)
class FalconSuperXASDirect(PSIDeviceBase, FalconControlDirect):
"""Falcon implementierung at SuperXAS. prefix: 'X10DA-SITORO:'"""
preview = Cpt(
Signal, name="preview", kind=Kind.omitted, doc="Preview signal for Falcon detector"
)
icr = Cpt(
Signal, name="icr", kind=Kind.normal)
ocr = Cpt(Signal, name="icr", kind=Kind.normal)
elap_real_time = Cpt(Signal, name="icr", kind=Kind.normal)
roi0_count = Cpt(Signal, name="icr", kind=Kind.normal)
dead_cor_roi0_count = Cpt(
Signal,
name="dead_cor_roi0_count",
doc="Async dead time corrected ROI 0 count signal",
kind=Kind.normal
)
########################################
# Beamline Specific Implementations #
########################################
def __init__(
self,
name: str,
prefix: str = "",
scan_info: ScanInfo | None = None,
device_manager: DeviceManagerDS | None = None,
**kwargs,
):
"""
Initialize Falcon device.
Args:
name (str): Name of the device
prefix (str): Prefix of the device
scan_info (ScanInfo): Information about the scan
**kwargs: Additional keyword arguments
"""
super().__init__(
name=name, prefix=prefix, scan_info=scan_info, device_manager=device_manager, **kwargs
)
self.dm = device_manager
self._pv_timeout = 5
self._async_read_data = [
"dxp_input_count_rate",
"dxp_output_count_rate",
"mca_elapsed_real_time",
"mca_rois_roi0_count",
]
self._index = 0
def on_init(self) -> None:
"""
Called when the device is initialized.
No signals are connected at this point. If you like to
set default values on signals, please use on_connected instead.
"""
def on_connected(self) -> None:
"""
Called after the device is connected and its signals are connected.
Default values for signals should be set here.
"""
# self.stop_all.put(1)
# CompareStatus(self.acquiring, FalconAcquiringStatus.DONE).wait(5)
# self.mca1.spectrum.subscribe(self._update_preview, run=False)
# TODO add again once PreviewSIgnal works with GUI
# def _update_preview(self, old_value, value, **kwargs) -> None:
# """Update the preview signal with the latest spectrum data."""
# logger.info(f"Received new spectrum data: {value}")
# self.preview.put(value)
def on_stage(self) -> DeviceStatus | StatusBase | None:
"""
Called while staging the device.
Information about the upcoming scan can be accessed from the scan_info (self.scan_info.msg) object.
"""
# self.stop_all.put(1, use_complete=True)
if self.acquiring.get() == FalconAcquiringStatus.ACQUIRING:
status = CompareStatus(self.acquiring, FalconAcquiringStatus.DONE)
self.stop_all.set(1).wait(self._pv_timeout)
try:
status.wait(self._pv_timeout)
except Exception as exc:
logger.warning(f"Device {self.name} failed to reach state 'done', current state {FalconAcquiringStatus(self.acquiring.get())}")
self.collect_mode.set(0).wait()
self.preset_real_time.set(0).wait()
self._index = 0
def on_unstage(self) -> DeviceStatus | StatusBase | None:
"""Called while unstaging the device."""
self.stop_all.set(1).wait(self._pv_timeout)
def on_pre_scan(self) -> DeviceStatus | StatusBase | None:
"""Called right before the scan starts on all devices automatically."""
def on_trigger(self) -> DeviceStatus | StatusBase | None:
"""Called when the device is triggered."""
def on_complete(self) -> DeviceStatus | StatusBase | None:
"""Called to inquire if a device has completed a scans."""
def on_kickoff(self) -> DeviceStatus | StatusBase | None:
"""Called to kickoff a device for a fly scan. Has to be called explicitly."""
def on_stop(self) -> None:
"""Called when the device is stopped."""
self.stop_all.set(1).wait(self._pv_timeout)
def send_data(self):
"""
Wait until the Falcon is done acquiring data.
This method blocks until the acquiring status is DONE.
"""
time_started = time.time()
# CompareStatus(self.acquiring, FalconAcquiringStatus.DONE).wait(self._pv_timeout)
logger.info(f"Sending data for {self.name} at {time_started}")
self.icr.put(self.dxp1.input_count_rate.get())
logger.info(f"Data to plot {self.icr.get()}")
self.ocr.put(self.dxp1.output_count_rate.get())
self.elap_real_time.put(self.mca1.elapsed_real_time.get())
self.roi0_count.put(self.mca1.roi_count.get())
# self.dead_cor_roi0_count.put(self.dead_time_cor_cnts1.get())
self._send_preview_async()
logger.info(f"Data sent for {self.name} at {time.time()- time_started}")
def _send_preview_async(self) -> None:
metadata = {"async_update": {"type": "add", "max_shape": [None, 3000]}}
data = {self.preview.name: {"value": self.mca1.spectrum.get(), "timestamp": time.time()}}
msg = DeviceMessage(signals=data, metadata=metadata)
self.dm.connector.xadd(
MessageEndpoints.device_async_readback(
scan_id=self.scan_info.msg.scan_id, device=self.name
),
msg_dict={"data": msg},
max_size=1000,
expire=900,
)
self._index += 1

View File

@@ -1,150 +0,0 @@
import enum
import numpy as np
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd import Device, EpicsSignal, EpicsSignalRO, Kind, Signal
from ophyd_devices import CompareStatus, DeviceStatus, PreviewSignal, StatusBase
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
logger = bec_logger.logger
class FalconAcquiringStatus(int, enum.Enum):
"""Status of Falcon"""
DONE = 0
ACQUIRING = 1
class FalconControlSlim(Device):
"""Slim Falcon Control class for SuperXAS. prefix: 'X10DA-SITORO:'"""
erase_start:EpicsSignal = Cpt(EpicsSignal, "EraseStart", kind=Kind.omitted, doc="XMAP start signal")
stop_all:EpicsSignal = Cpt(EpicsSignal, "StopAll", kind=Kind.omitted,doc="XMAP stop signal")
acquiring:EpicsSignalRO = Cpt(EpicsSignalRO, "Acquiring", kind=Kind.omitted, auto_monitor=True, doc="XMAP acquiring signal")
icr:EpicsSignalRO = Cpt(EpicsSignalRO, "dxp1:InputCountRate", doc="XMAP input count rate")
ocr:EpicsSignalRO = Cpt(EpicsSignalRO, "dxp1:OutputCountRate", doc="XMAP output count rate")
ert:EpicsSignalRO = Cpt(EpicsSignalRO, "mca1.ERTM", doc="XMAP elapsed real time")
roi:EpicsSignalRO = Cpt(EpicsSignalRO, "mca1.R0", kind=Kind.hinted, doc="XMAP ROI signal")
label:EpicsSignalRO = Cpt(EpicsSignalRO, "mca1.R0NM", kind=Kind.config, doc="XMAP ROI signal")
spectrum_val:EpicsSignalRO = Cpt(EpicsSignalRO, "mca1.VAL", kind=Kind.omitted)
# Preview Signal for Falcon detector
spectrum = Cpt(
PreviewSignal, name="spectrum", ndim=1, doc="Preview signal for Falcon detector spectrum"
)
# Configuration attributes
collect_mode:EpicsSignal = Cpt(EpicsSignal, "CollectMode", doc="Collect mode signal")
preset_real:EpicsSignal = Cpt(EpicsSignal, "PresetReal", doc="Preset real time signal")
# Computed signal for dead time corrected counts
dead_cor_roi0_count = Cpt(Signal, name="dead_cor_roi0_count", doc="Dead time corrected ROI 0 count")
def compute_dead_time_corrected_signal(icr: float, ocr: float, roi: float, ert: float):
_dead_time = 1.182e-7
if icr == 0 or ocr == 0:
return 0
# Check that relative change is large enough
test = 1e9
test_icr = icr
n = 0
while test > _dead_time and n < 30:
try:
true_icr = icr * np.exp(test_icr * _dead_time)
test = (true_icr - test_icr) / test_icr
test_icr = true_icr
n += 1
except Exception as e: # pylint: disable=broad-except
logger.info(f"Error in computation of deadtime corrected signal, error: {e}")
return 0
# Return corrected roi counts
cor_roi_cnts = 0
if ocr * ert != 0:
cor_roi_cnts = roi * true_icr / (ocr * ert)
return cor_roi_cnts
class FalconSuperXASSlim(PSIDeviceBase, FalconControlSlim):
"""Slim Falcon implementation at SuperXAS. prefix: 'X10DA-SITORO:'"""
def __init__(self, *, name, prefix="", scan_info=None, device_manager=None, **kwargs):
super().__init__(
name=name, prefix=prefix, scan_info=scan_info, device_manager=device_manager, **kwargs
)
self._pv_timeout = 5 # seconds
def on_init(self) -> None:
"""
Called when the device is initialized.
No signals are connected at this point. If you like to
set default values on signals, please use on_connected instead.
"""
def on_connected(self) -> None:
"""
Called after the device is connected and its signals are connected.
Default values for signals should be set here.
"""
def on_stage(self) -> DeviceStatus | StatusBase | None:
"""
Called while staging the device.
Information about the upcoming scan can be accessed from the scan_info (self.scan_info.msg) object.
"""
if self.acquiring.get() != FalconAcquiringStatus.DONE:
logger.info(f"Falcon state was {self.acquiring.get()} during stage. Calling stop_all")
status = CompareStatus(self.acquiring, FalconAcquiringStatus.DONE)
self.cancel_on_stop(status)
self.stop_all.put(1)
status.wait(timeout=self._pv_timeout)
self.collect_mode.set(0).wait(timeout=self._pv_timeout)
self.preset_real.set(0).wait(timeout=self._pv_timeout)
def on_unstage(self) -> DeviceStatus | StatusBase | None:
"""Called while unstaging the device."""
if self.acquiring.get() != FalconAcquiringStatus.DONE:
self.stop_all.put(1)
status = CompareStatus(self.acquiring, FalconAcquiringStatus.DONE)
self.cancel_on_stop(status)
return status
def on_pre_scan(self) -> DeviceStatus | StatusBase | None:
"""Called right before the scan starts on all devices automatically."""
def on_trigger(self) -> DeviceStatus | StatusBase | None:
"""Called when the device is triggered."""
def on_complete(self) -> DeviceStatus | StatusBase | None:
"""Called to inquire if a device has completed a scans."""
def on_kickoff(self) -> DeviceStatus | StatusBase | None:
"""Called to kickoff a device for a fly scan. Has to be called explicitly."""
def on_stop(self) -> None:
"""Called when the device is stopped."""
self.stop_all.put(1)
def update_data(self):
"""
Set the dead time corrected signal based on input count rate, output count rate, ROI count, and elapsed real time.
Parameters:
icr (float): Input count rate.
ocr (float): Output count rate.
roi (float): ROI count.
ert (float): Elapsed real time.
"""
dead_time_corrected_signal = compute_dead_time_corrected_signal(
self.icr.get(), self.ocr.get(), self.roi.get(), self.ert.get()
)
self.dead_cor_roi0_count.put(dead_time_corrected_signal)
self.spectrum.put(self.spectrum_val.get())

View File

@@ -1,97 +0,0 @@
import time
import epics
import numpy as np
from ophyd_devices import CompareStatus, TransitionStatus
from superxas_bec.devices.falcon_direct import FalconAcquiringStatus, FalconControlDirect as FalconSuperXAS
from superxas_bec.devices.trigger import ContinuousSamplingMode, SamplingDone, Trigger
def mock_motor_move(pos: float) -> None:
"""Mock function to simulate motor movement."""
print(f"Mock Motor starts moving...")
# time.sleep(0.5)
print(f"Mock Motor has reached the target position {pos}")
def sleep_poll(total_sleep:float):
sleep_timer = 0.01
# time.sleep(total_sleep)
for ii in range(int(total_sleep/sleep_timer)):
time.sleep(sleep_timer)
epics.poll()
if __name__ == "__main__":
# time.sleep(20) # Give time to connect pyspy
# Exposure time 0.6s
exp_time = 0.6
# steps = 10
positions = np.linspace(0, 1, 10) # Simulated positions for the motor
# Example usage of the FalconSuperXAS and Trigger classes
falcon = FalconSuperXAS(name="falcon", prefix="X10DA-SITORO:")
trigger = Trigger(name="trigger", prefix="X10DA-ES1:")
print(f"Initialized {falcon.name} with prefix {falcon.prefix}")
print(f"Initialized {trigger.name} with prefix {trigger.prefix}")
falcon.wait_for_connection(all_signals=True, timeout=60)
trigger.wait_for_connection(all_signals=True, timeout=60)
# Simulate a scan!
for iteration in range(3):
print(f"\nStarting iteration {iteration + 1} of the scan...")
#### STAGE ####
# Check if Falcon is acquiring data and stop if necessary
if falcon.acquiring.get() == FalconAcquiringStatus.ACQUIRING:
print("Falcon is currently acquiring data.")
print("Stopping all acquisitions...")
status = CompareStatus(falcon.acquiring, FalconAcquiringStatus.DONE)
falcon.stop_all.put(1)
status.wait(timeout=5)
print("All acquisitions stopped.")
# Prepare the devices
falcon.collect_mode.set(0).wait()
falcon.preset_real_time.set(0).wait()
print("Falcon is prepared for the scan.")
trigger.start_csmpl.set(ContinuousSamplingMode.OFF).wait(timeout=5)
cycles = max(int(exp_time * 5), 1) # Must be at least 1 cycle, each cycle is 0.2s
trigger.total_cycles.set(cycles).wait(timeout=5)
print(f"Trigger is prepared for the scan with {cycles} cycles.")
#### End STAGE ####
### Trigger at each point###
# Simulate motion and data acquisition at each point
print("Starting the scan...")
for pos in positions:
mock_motor_move(pos)
status = CompareStatus(falcon.acquiring, FalconAcquiringStatus.ACQUIRING)
falcon.erase_start.put(1)
print(f"Acquiring state after erase_start: {FalconAcquiringStatus(falcon.acquiring.get())}")
status.wait(timeout=5)
sleep_poll(0.4)
# time.sleep(0.4)
status_smpl = TransitionStatus(
trigger.smpl_done, [SamplingDone.RUNNING, SamplingDone.DONE]
)
trigger.smpl.put(1)
status_smpl.wait()
sleep_poll(0.4)
# time.sleep(0.4)
status = CompareStatus(falcon.acquiring, FalconAcquiringStatus.DONE)
falcon.stop_all.put(1)
status.wait(timeout=5)
sleep_poll(0.4)
# time.sleep(0.4) # Simulate some processing time
# print(falcon.mca1.rois.roi0.count.get())
print(falcon.mca1.roi0_count.get())
print(falcon.mca1.elapsed_real_time.get())
print(falcon.dxp1.input_count_rate.get())
print(falcon.dxp1.output_count_rate.get())
# print(falcon.mca1.elapsed_real_time.get())
# print(falcon.max_elapsed_live.get())
# print(falcon.max_elapsed_real.get())
sleep_poll(2)
# time.sleep(2) # #FIXME <- When removed, crashes always in second loop! Otherwise, it soemtimes works..

View File

@@ -1,61 +0,0 @@
#!/usr/bin/python
# python script
import time
from CaChannel import CaChannel, ca
I0 = CaChannel()
I0.searchw("X10DA-ES1-SAI_01:MEAN")
Trigger = CaChannel()
Trigger.searchw("X10DA-ES1:SMPL")
TriggerDone = CaChannel()
TriggerDone.searchw("X10DA-ES1:SMPL-DONE")
XMAPStart = CaChannel()
XMAPStart.searchw("X10DA-SITORO:EraseStart")
XMAPStop = CaChannel()
XMAPStop.searchw("X10DA-SITORO:StopAll")
XMAPAcquiring = CaChannel()
XMAPAcquiring.searchw("X10DA-SITORO:Acquiring")
XMAPICR = CaChannel()
XMAPICR.searchw("X10DA-SITORO:dxp1:InputCountRate")
XMAPOCR = CaChannel()
XMAPOCR.searchw("X10DA-SITORO:dxp1:OutputCountRate")
XMAPROI = CaChannel()
XMAPROI.searchw("X10DA-SITORO:mca1.R0")
for i in range(25):
start_time = time.time()
XMAPStart.putw(1)
f = 0
while f == 0:
time.sleep(0.05)
f = XMAPAcquiring.getw()
Trigger.putw(1)
time.sleep(0.2)
t = 0
while t == 0:
time.sleep(0.05)
t = TriggerDone.getw()
XMAPStop.putw(1)
f = 1
while f == 1:
time.sleep(0.05)
f = XMAPAcquiring.getw()
time.sleep(0.1)
i0 = I0.getw()
icr = XMAPICR.getw()
ocr = XMAPOCR.getw()
roi = XMAPROI.getw()
end_time = time.time()
print(f"time={end_time-start_time:.4f}", i0, icr, ocr, roi)

View File

@@ -1,82 +0,0 @@
import time
from ophyd import Component as Cpt
from ophyd import Device, EpicsSignal, EpicsSignalRO
from ophyd.status import SubscriptionStatus
from ophyd_devices import CompareStatus, TransitionStatus
class DummyDevice(Device):
"""
A dummy device for testing purposes.
"""
i_0 = Cpt(EpicsSignalRO, "ES1-SAI_01:MEAN", doc="I0 signal")
trigger_smpl = Cpt(EpicsSignal, "ES1:SMPL", doc="Trigger signal")
trigger_done = Cpt(EpicsSignalRO, "ES1:SMPL-DONE", auto_monitor=True, doc="Trigger done signal")
xmap_start = Cpt(EpicsSignal, "SITORO:EraseStart", doc="XMAP start signal")
xmap_stop = Cpt(EpicsSignal, "SITORO:StopAll", doc="XMAP stop signal")
xmap_acquiring = Cpt(
EpicsSignalRO, "SITORO:Acquiring", auto_monitor=True, doc="XMAP acquiring signal"
)
xmap_icr = Cpt(EpicsSignalRO, "SITORO:dxp1:InputCountRate", doc="XMAP input count rate")
xmap_ocr = Cpt(EpicsSignalRO, "SITORO:dxp1:OutputCountRate", doc="XMAP output count rate")
xmap_roi = Cpt(EpicsSignalRO, "SITORO:mca1.R0", doc="XMAP ROI signal")
def state_changed_callback(*, value, old_value, **kwargs):
"""Callback for acquiring signal changes."""
if old_value == 0 and value == 1 or old_value == 1 and value == 0:
print(f"State changed: {old_value} -> {value}")
return True
return False
def acquire_stoped(*, value, old_value, **kwargs):
"""Callback for acquisition stop."""
if value == 0:
print("Acquisition stopped.")
return True
return False
if __name__ == "__main__":
# Create an instance of the dummy device
print("Initializing DummyDevice...")
time_started = time.time()
device = DummyDevice(name="test_device", prefix="X10DA-")
device.wait_for_connection(timeout=50, all_signals=True)
print(f"Device initialized in {time.time() - time_started:.2f} seconds.")
# status = SubscriptionStatus(device.xmap_acquiring, acquire_stoped)
status = CompareStatus(device.xmap_acquiring, 0)
device.xmap_stop.put(1)
status.wait(timeout=5)
for i in range(500):
start_time = time.time()
# status = SubscriptionStatus(device.xmap_acquiring, state_changed_callback)
status = CompareStatus(device.xmap_acquiring, 1)
device.xmap_start.put(1)
status.wait(timeout=5)
# status2 = SubscriptionStatus(device.trigger_done, state_changed_callback)
status2 = CompareStatus(device.trigger_done, 0)
device.trigger_smpl.put(1)
status2.wait(timeout=5)
# status3 = SubscriptionStatus(device.xmap_acquiring, state_changed_callback)
status3 = CompareStatus(device.xmap_acquiring, 0)
device.xmap_stop.put(1)
status3.wait(timeout=5)
time.sleep(0.1)
i0 = device.i_0.get()
icr = device.xmap_icr.get()
ocr = device.xmap_ocr.get()
roi = device.xmap_roi.get()
print(f"time={time.time() - start_time:.4f}", i0, icr, ocr, roi)

View File

@@ -1,74 +0,0 @@
#!/usr/bin/python
# PyEpics version of the script
import time
import threading
import epics
def main():
# Define PVs
I0 = epics.PV("X10DA-ES1-SAI_01:MEAN")
Trigger = epics.PV("X10DA-ES1:SMPL")
TriggerDone = epics.PV("X10DA-ES1:SMPL-DONE", auto_monitor=True)
XMAPStart = epics.PV("X10DA-SITORO:EraseStart")
XMAPStop = epics.PV("X10DA-SITORO:StopAll")
XMAPAcquiring = epics.PV("X10DA-SITORO:Acquiring", auto_monitor=True)
XMAPICR = epics.PV("X10DA-SITORO:dxp1:InputCountRate")
XMAPOCR = epics.PV("X10DA-SITORO:dxp1:OutputCountRate")
XMAPROI = epics.PV("X10DA-SITORO:mca1.R0")
acquire_started = threading.Event()
acquire_stopped = threading.Event()
trigger_done = threading.Event()
def acquiring_cb(pvname=None, value=None, **kwargs):
if value == 1:
acquire_started.set()
elif value == 0:
acquire_stopped.set()
def trigger_done_cb(pvname=None, value=None, **kwargs):
if value == 1:
trigger_done.set()
# Wait for connections (optional)
for pv in [I0, Trigger, TriggerDone, XMAPStart, XMAPStop, XMAPAcquiring, XMAPICR, XMAPOCR, XMAPROI]:
pv.wait_for_connection(timeout=2)
TriggerDone.add_callback(trigger_done_cb)
XMAPAcquiring.add_callback(acquiring_cb)
# Measurement loop
for i in range(500):
acquire_started.clear()
acquire_stopped.clear()
trigger_done.clear()
start_time = time.time()
XMAPStart.put(1, wait=False)
acquire_started.wait(timeout=5)
Trigger.put(1, wait=False)
trigger_done.wait(3)
XMAPStop.put(1, wait=True)
acquire_stopped.wait(5)
time.sleep(0.1)
i0 = I0.get()
icr = XMAPICR.get()
ocr = XMAPOCR.get()
roi = XMAPROI.get()
end_time = time.time()
print(f"time={end_time - start_time:.4f}", i0, icr, ocr, roi)
if __name__=="__main__":
main()

View File

@@ -10,7 +10,7 @@ from ophyd import Device, DeviceStatus, EpicsSignal, EpicsSignalRO, Kind, Status
from ophyd_devices import CompareStatus, TransitionStatus
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from superxas_bec.devices.falcon_slim import FalconAcquiringStatus, FalconSuperXASSlim
from superxas_bec.devices.falcon import FalconAcquiringStatus, FalconSuperXAS
logger = bec_logger.logger
@@ -127,7 +127,7 @@ class Trigger(PSIDeviceBase, TriggerControl):
falcon = self.device_manager.devices.get("falcon", None)
if falcon is not None and falcon.enabled is True:
falcon: FalconSuperXASSlim
falcon: FalconSuperXAS
status = CompareStatus(falcon.acquiring, FalconAcquiringStatus.ACQUIRING)
self.cancel_on_stop(status)
falcon.erase_start.put(1)
@@ -146,7 +146,6 @@ class Trigger(PSIDeviceBase, TriggerControl):
falcon.stop_all.put(1)
status.wait(timeout=self._pv_timeout)
time.sleep(0.4) # Simulate some processing time
falcon.obj.update_data()
return status_smpl
def on_complete(self) -> DeviceStatus | StatusBase | None: