Compare commits

..

4 Commits

Author SHA1 Message Date
hitz_s 1a20282499 Merge pull request 'fix: update info storage with num_monitored_readouts' (#79) from fix/scan_info_readouts into main
CI for debye_bec / test (push) Successful in 57s
Reviewed-on: #79
2026-06-03 13:15:21 +02:00
wakonig_k 1a6eb5ab90 fix: update info storage with num_monitored_readouts
CI for debye_bec / test (push) Successful in 1m4s
CI for debye_bec / test (pull_request) Successful in 1m1s
2026-06-03 13:10:38 +02:00
hitz_s ea23ab2284 Merge pull request 'fix: mo1_bragg calculator' (#78) from fix-mono-calculator into main
CI for debye_bec / test (push) Successful in 54s
Reviewed-on: #78
2026-06-02 13:25:27 +02:00
x01da 52d97b2d29 fix: mo1_bragg calculator
CI for debye_bec / test (push) Successful in 57s
CI for debye_bec / test (pull_request) Failing after 38s
2026-06-02 13:24:14 +02:00
4 changed files with 14 additions and 131 deletions
-124
View File
@@ -1,124 +0,0 @@
"""FALCON device implementation for SuperXAS"""
from __future__ import annotations
import enum
import traceback
from typing import TYPE_CHECKING
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd_devices import AsyncSignal, CompareStatus, DeviceStatus, StatusBase
from ophyd_devices.devices.areadetector.plugins import ImagePlugin_V35 as ImagePlugin
from ophyd_devices.devices.dxp import EpicsDXPFalcon, EpicsMCARecord, Falcon
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
if TYPE_CHECKING:
from bec_lib.devicemanager import ScanInfo
logger = bec_logger.logger
class FalconAcquiringStatus(int, enum.Enum):
"""Status of Falcon"""
DONE = 0
ACQUIRING = 1
class FalconControl(Falcon):
"""Falcon Control class at SuperXAS. prefix: 'X10DA-SITORO:'"""
# DXP parameters
dxp1 = Cpt(EpicsDXPFalcon, "dxp1:")
# MCA record with spectrum data
mca1 = Cpt(EpicsMCARecord, "mca1")
# Image record
image = Cpt(ImagePlugin, "image1:")
class FalconSuperXAS(PSIDeviceBase, FalconControl):
"""Falcon implementierung at SuperXAS. prefix: 'X10DA-SITORO:'"""
data = Cpt(
AsyncSignal,
name="data",
ndim=1,
max_size=1000,
doc="1D Waveform data from Falcon detector.",
)
########################################
# Beamline Specific Implementations #
########################################
def on_init(self) -> None:
"""
Called when the device is initialized.
No signals are connected at this point. If you like to
set default values on signals, please use on_connected instead.
"""
self._pv_timeout = 1
self._falcon_energy_channels = None
def on_connected(self) -> None:
"""
Called after the device is connected and its signals are connected.
Default values for signals should be set here.
"""
# Reset array counter on connect
self.cam.array_counter.set(0).wait(timeout=self._pv_timeout)
self.image.unique_id.subscribe(self._on_new_data_received, run=False)
def on_stage(self) -> CompareStatus:
"""
Called while staging the device.
Information about the upcoming scan can be accessed from the scan_info (self.scan_info.msg) object.
"""
def on_unstage(self) -> CompareStatus:
"""Called while unstaging the device."""
def on_pre_scan(self) -> DeviceStatus | StatusBase | None:
"""Called right before the scan starts on all devices automatically."""
def on_trigger(self) -> DeviceStatus | StatusBase | None:
"""Called when the device is triggered."""
def on_complete(self) -> DeviceStatus | StatusBase | None:
"""Called to inquire if a device has completed a scans."""
def on_kickoff(self) -> DeviceStatus | StatusBase | None:
"""Called to kickoff a device for a fly scan. Has to be called explicitly."""
def on_stop(self) -> None:
"""Called when the device is stopped."""
########################################
# Custom Methods #
########################################
def _on_new_data_received(self, value: int, old_value: int, **kwargs):
"""Callback for image unique ID updates to trigger preview update."""
if value == old_value:
return # No new image, or counter reset
try:
# Get new image data
array_data = self.image.array_data.get()
if array_data is None:
logger.info(f"No image data available for preview of {self.name}")
return
if self._falcon_energy_channels is None:
# Initialize energy channels based on the first received data
self._falcon_energy_channels = len(array_data)
logger.info(f"Initialized Falcon energy channels to {self._falcon_energy_channels}")
# Geometry correction for the image
self.data.put(
array_data,
async_update={"type": "add", "max_shape": [None, self._falcon_energy_channels]},
)
except Exception: # pylint: disable=broad-except
content = traceback.format_exc()
logger.error(f"Error while updating preview for {self.name} on image update: {content}")
+13 -7
View File
@@ -416,25 +416,31 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
Returns:
output (float): Converted angle or energy
"""
self.calculator.calc_reset.put(0)
self.calculator.calc_reset.put(1)
status = CompareStatus(self.calculator.calc_done, 0)
self.cancel_on_stop(status)
status.wait(self.timeout_for_pvwait)
self.calculator.calc_reset.put(0)
if mode == "AngleToEnergy":
self.calculator.calc_angle.put(inp)
in_signal = self.calculator.calc_angle
out_signal = self.calculator.calc_energy
elif mode == "EnergyToAngle":
self.calculator.calc_energy.put(inp)
in_signal = self.calculator.calc_energy
out_signal = self.calculator.calc_angle
else:
raise Mo1BraggError(f'Unknown mode {mode}')
in_signal.put(inp)
status = CompareStatus(self.calculator.calc_done, 1)
self.cancel_on_stop(status)
status.wait(self.timeout_for_pvwait)
time.sleep(0.25) # TODO needed still? Needed due to update frequency of softIOC
if mode == "AngleToEnergy":
return self.calculator.calc_energy.get()
elif mode == "EnergyToAngle":
return self.calculator.calc_angle.get()
status = CompareStatus(out_signal, 0, operation_success='>')
self.cancel_on_stop(status)
status.wait(self.timeout_for_pvwait)
return out_signal.get()
def set_advanced_xas_settings(
self, low: float, high: float, scan_time: float, p_kink: float, e_kink: float
+1
View File
@@ -13,6 +13,7 @@ def fetch_scan_info(scan_info: ScanInfo) -> ScanServerScanInfo:
info = scan_info.msg.info
if isinstance(info["positions"], list):
info["positions"] = np.array(info["positions"])
info["num_monitored_readouts"] = scan_info.msg.num_monitored_readouts
try:
msg = ScanServerScanInfo.model_validate(info)
except ValidationError: # This means we have an old scan_info object.