Compare commits

..

2 Commits

Author SHA1 Message Date
x01da 4a00d3baeb Automatic backup triggered by new deployment
CI for debye_bec / test (push) Successful in 58s
2026-06-02 13:29:38 +02:00
x01da 52d97b2d29 fix: mo1_bragg calculator
CI for debye_bec / test (push) Successful in 57s
CI for debye_bec / test (pull_request) Failing after 38s
2026-06-02 13:24:14 +02:00
4 changed files with 18 additions and 136 deletions
-124
View File
@@ -1,124 +0,0 @@
"""FALCON device implementation for SuperXAS"""
from __future__ import annotations
import enum
import traceback
from typing import TYPE_CHECKING
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd_devices import AsyncSignal, CompareStatus, DeviceStatus, StatusBase
from ophyd_devices.devices.areadetector.plugins import ImagePlugin_V35 as ImagePlugin
from ophyd_devices.devices.dxp import EpicsDXPFalcon, EpicsMCARecord, Falcon
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
if TYPE_CHECKING:
from bec_lib.devicemanager import ScanInfo
logger = bec_logger.logger
class FalconAcquiringStatus(int, enum.Enum):
"""Status of Falcon"""
DONE = 0
ACQUIRING = 1
class FalconControl(Falcon):
"""Falcon Control class at SuperXAS. prefix: 'X10DA-SITORO:'"""
# DXP parameters
dxp1 = Cpt(EpicsDXPFalcon, "dxp1:")
# MCA record with spectrum data
mca1 = Cpt(EpicsMCARecord, "mca1")
# Image record
image = Cpt(ImagePlugin, "image1:")
class FalconSuperXAS(PSIDeviceBase, FalconControl):
"""Falcon implementierung at SuperXAS. prefix: 'X10DA-SITORO:'"""
data = Cpt(
AsyncSignal,
name="data",
ndim=1,
max_size=1000,
doc="1D Waveform data from Falcon detector.",
)
########################################
# Beamline Specific Implementations #
########################################
def on_init(self) -> None:
"""
Called when the device is initialized.
No signals are connected at this point. If you like to
set default values on signals, please use on_connected instead.
"""
self._pv_timeout = 1
self._falcon_energy_channels = None
def on_connected(self) -> None:
"""
Called after the device is connected and its signals are connected.
Default values for signals should be set here.
"""
# Reset array counter on connect
self.cam.array_counter.set(0).wait(timeout=self._pv_timeout)
self.image.unique_id.subscribe(self._on_new_data_received, run=False)
def on_stage(self) -> CompareStatus:
"""
Called while staging the device.
Information about the upcoming scan can be accessed from the scan_info (self.scan_info.msg) object.
"""
def on_unstage(self) -> CompareStatus:
"""Called while unstaging the device."""
def on_pre_scan(self) -> DeviceStatus | StatusBase | None:
"""Called right before the scan starts on all devices automatically."""
def on_trigger(self) -> DeviceStatus | StatusBase | None:
"""Called when the device is triggered."""
def on_complete(self) -> DeviceStatus | StatusBase | None:
"""Called to inquire if a device has completed a scans."""
def on_kickoff(self) -> DeviceStatus | StatusBase | None:
"""Called to kickoff a device for a fly scan. Has to be called explicitly."""
def on_stop(self) -> None:
"""Called when the device is stopped."""
########################################
# Custom Methods #
########################################
def _on_new_data_received(self, value: int, old_value: int, **kwargs):
"""Callback for image unique ID updates to trigger preview update."""
if value == old_value:
return # No new image, or counter reset
try:
# Get new image data
array_data = self.image.array_data.get()
if array_data is None:
logger.info(f"No image data available for preview of {self.name}")
return
if self._falcon_energy_channels is None:
# Initialize energy channels based on the first received data
self._falcon_energy_channels = len(array_data)
logger.info(f"Initialized Falcon energy channels to {self._falcon_energy_channels}")
# Geometry correction for the image
self.data.put(
array_data,
async_update={"type": "add", "max_shape": [None, self._falcon_energy_channels]},
)
except Exception: # pylint: disable=broad-except
content = traceback.format_exc()
logger.error(f"Error while updating preview for {self.name} on image update: {content}")
+13 -7
View File
@@ -416,25 +416,31 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
Returns:
output (float): Converted angle or energy
"""
self.calculator.calc_reset.put(0)
self.calculator.calc_reset.put(1)
status = CompareStatus(self.calculator.calc_done, 0)
self.cancel_on_stop(status)
status.wait(self.timeout_for_pvwait)
self.calculator.calc_reset.put(0)
if mode == "AngleToEnergy":
self.calculator.calc_angle.put(inp)
in_signal = self.calculator.calc_angle
out_signal = self.calculator.calc_energy
elif mode == "EnergyToAngle":
self.calculator.calc_energy.put(inp)
in_signal = self.calculator.calc_energy
out_signal = self.calculator.calc_angle
else:
raise Mo1BraggError(f'Unknown mode {mode}')
in_signal.put(inp)
status = CompareStatus(self.calculator.calc_done, 1)
self.cancel_on_stop(status)
status.wait(self.timeout_for_pvwait)
time.sleep(0.25) # TODO needed still? Needed due to update frequency of softIOC
if mode == "AngleToEnergy":
return self.calculator.calc_energy.get()
elif mode == "EnergyToAngle":
return self.calculator.calc_angle.get()
status = CompareStatus(out_signal, 0, operation_success='>')
self.cancel_on_stop(status)
status.wait(self.timeout_for_pvwait)
return out_signal.get()
def set_advanced_xas_settings(
self, low: float, high: float, scan_time: float, p_kink: float, e_kink: float
+5 -5
View File
@@ -545,11 +545,11 @@ class Pilatus(PSIDeviceBase, ADBase):
self.cam.array_callbacks.set(1).wait(5) # Enable array callbacks
self.hdf.enable.set(1).wait(5) # Enable HDF5 plugin
# Camera settings
self.cam.num_exposures.set(1).wait(5)
self.cam.num_images.set(self.n_images).wait(5)
self.cam.acquire_time.set(detector_exp_time).wait(5) # let's try this
self.cam.acquire_period.set(exp_time).wait(5)
self.filter_number.set(0).wait(5)
self.cam.num_exposures.set(1, timeout=5).wait()
self.cam.num_images.set(self.n_images, timeout=5).wait()
self.cam.acquire_time.set(detector_exp_time, timeout=5).wait() # let's try this
self.cam.acquire_period.set(exp_time, timeout=5).wait()
self.filter_number.set(0, timeout=5).wait()
# HDF5 settings
logger.debug(
f"Setting HDF5 file path to {file_path} and file name to {file_name}. full_path is {self._full_path}"