fix: fixes from tests at the beamline

This commit is contained in:
x01da
2026-05-22 12:36:03 +02:00
committed by appel_c
parent 8bc36ed6a2
commit 87758710d9
7 changed files with 36 additions and 33 deletions
+4 -5
View File
@@ -103,9 +103,8 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
self.scan_control.scan_val_reset.put(1)
status.wait(timeout=self.timeout_for_pvwait)
scan_name = self.scan_parameters.msg.scan_name
start = self.scan_parameters.additional_scan_parameters.get("start", None)
stop = self.scan_parameters.additional_scan_parameters.get("stop", None)
scan_name = self.scan_parameters.scan_name
start, stop = self.scan_parameters.positions or (None, None)
scan_time = self.scan_parameters.additional_scan_parameters.get("scan_time", None)
scan_duration = self.scan_parameters.additional_scan_parameters.get("scan_duration", None)
if scan_name == "xas_simple_scan":
@@ -140,7 +139,7 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
)
cycle_low = self.scan_parameters.additional_scan_parameters.get("cycle_low", None)
cycle_high = self.scan_parameters.additional_scan_parameters.get("cycle_high", None)
exp_time = self.scan_parameters.additional_scan_parameters.get("exp_time", None)
exp_time = self.scan_parameters.exp_time
n_of_trigger = self.scan_parameters.additional_scan_parameters.get("n_of_trigger", None)
if any(
param is None
@@ -216,7 +215,7 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
)
cycle_low = self.scan_parameters.additional_scan_parameters.get("cycle_low", None)
cycle_high = self.scan_parameters.additional_scan_parameters.get("cycle_high", None)
exp_time = self.scan_parameters.additional_scan_parameters.get("exp_time", None)
exp_time = self.scan_parameters.exp_time
n_of_trigger = self.scan_parameters.additional_scan_parameters.get("n_of_trigger", None)
if any(
param is None
+5 -1
View File
@@ -2,8 +2,12 @@
from bec_lib.devicemanager import ScanInfo
from bec_server.scan_server.scans.scan_base import ScanInfo as ScanServerScanInfo
import numpy as np
def fetch_scan_info(scan_info: ScanInfo) -> ScanServerScanInfo:
"""Fetch the scan parameters from the scan_info object and return them as a ScanServerScanInfo object."""
return ScanServerScanInfo.model_validate(scan_info.msg.info)
info = scan_info.msg.info
if isinstance(info["positions"], list):
info["positions"] = np.array(info["positions"])
return ScanServerScanInfo.model_validate(info)
+1 -1
View File
@@ -1,4 +1,4 @@
from .nidaq_cont_scan import NIDAQContinuousScan
# from .nidaq_cont_scan import NIDAQContinuousScan
from .nidaq_continuous_scan import NidaqContinuousScan
from .xas_simple_scan import (
XasAdvancedScan,
+2 -2
View File
@@ -40,7 +40,7 @@ class NIDAQContinuousScan(AsyncFlyScanBase):
self.scan_duration = scan_duration
self.daq = daq
self.start_time = 0
self.primary_readout_cycle = 1
self.monitored_readout_cycle = 1
self.scan_parameters["scan_duration"] = scan_duration
self.scan_parameters["compression"] = compression
@@ -78,7 +78,7 @@ class NIDAQContinuousScan(AsyncFlyScanBase):
while not complete_status.done:
# Readout monitored devices
yield from self.stubs.read(group="monitored", point_id=self.point_id)
time.sleep(self.primary_readout_cycle)
time.sleep(self.monitored_readout_cycle)
self.point_id += 1
self.num_pos = self.point_id
+4 -4
View File
@@ -44,8 +44,8 @@ class NidaqContinuousScan(ScanBase):
self,
#fmt: off
scan_duration: Annotated[float, ScanArgument(display_name="Scan Duration", description="Duration of the scan", units=Units.s)],
daq: Annotated[DeviceBase | None, ScanArgument(display_name="Daq", description="DAQ device to be used for the scan")],
compression: Annotated[bool, ScanArgument(display_name="Compression", description="Whether to compress the data")],
daq: Annotated[DeviceBase | None, ScanArgument(display_name="Daq", description="DAQ device to be used for the scan")] = None,
compression: Annotated[bool, ScanArgument(display_name="Compression", description="Whether to compress the data")]= False,
#fmt: on
**kwargs,
):
@@ -67,7 +67,7 @@ class NidaqContinuousScan(ScanBase):
self.scan_duration = scan_duration
self.daq = daq or self.dev["nidaq"]
self.compression = compression
self.primary_readout_cycle = 1.0 # seconds
self.monitored_readout_cycle = 1.0 # seconds
self.motors = [self.daq]
self.update_scan_info(scan_duration=scan_duration, compression=compression)
@@ -130,7 +130,7 @@ class NidaqContinuousScan(ScanBase):
while not complete_status.done:
self.at_each_point()
time.sleep(self.primary_readout_cycle)
time.sleep(self.monitored_readout_cycle)
@scan_hook
def at_each_point(self):
+19 -19
View File
@@ -32,19 +32,19 @@ class XasSimpleScan(ScanBase):
gui_config = {
"Movement Parameters": ["start", "stop"],
"Scan Parameters": ["scan_time", "scan_duration", "primary_readout_cycle"],
"Scan Parameters": ["scan_time", "scan_duration", "monitored_readout_cycle"],
}
def __init__(
self,
#fmt: off
start: Annotated[float, ScanArgument(display_name="Start Energy", description="Start energy.", units=Units.eV)],
stop: Annotated[float, ScanArgument(display_name="Stop Energy", description="Stop energy.", units=Units.eV)],
scan_time: Annotated[float, ScanArgument(display_name="Scan Time", description="Time for one scan cycle.", units=Units.s, ge=0)],
scan_duration: Annotated[float, ScanArgument(display_name="Scan Duration", description="Total scan duration.", units=Units.s, ge=0)],
start: Annotated[float, ScanArgument(display_name="Start Energy", description="Start energy.", units=Units.eV, ge=4500, le=64000)],
stop: Annotated[float, ScanArgument(display_name="Stop Energy", description="Stop energy.", units=Units.eV, ge=4500, le=64000)],
scan_time: Annotated[float, ScanArgument(display_name="Scan Time", description="Time for one scan cycle.", units=Units.s, ge=0.05)],
scan_duration: Annotated[float, ScanArgument(display_name="Scan Duration", description="Total scan duration.", units=Units.s, ge=0.05)],
motor: Annotated[DeviceBase | None, ScanArgument(display_name="Motor", description="Bragg motor device.")] = None,
daq: Annotated[DeviceBase | None, ScanArgument(display_name="DAQ", description="NIDAQ device.")] = None,
primary_readout_cycle: Annotated[float, ScanArgument(display_name="Primary Readout Cycle", description="Delay between monitored readouts.",units=Units.s, gt=0,)] = 1,
monitored_readout_cycle: Annotated[float, ScanArgument(display_name="Monitored Readout Cycle", description="Delay between monitored readouts.",units=Units.s, gt=0)] = 1,
#fmt: on
**kwargs,
):
@@ -58,7 +58,7 @@ class XasSimpleScan(ScanBase):
scan_duration (float): Total scan duration.
motor (DeviceBase | None): Bragg motor device.
daq (DeviceBase | None): NIDAQ device.
primary_readout_cycle (float): Delay between monitored readouts.
monitored_readout_cycle (float): Delay between monitored readouts.
Returns:
ScanReport
@@ -70,7 +70,7 @@ class XasSimpleScan(ScanBase):
self.scan_duration = scan_duration
self.motor = motor if motor is not None else self.dev["mo1_bragg"]
self.daq = daq if daq is not None else self.dev["nidaq"]
self.primary_readout_cycle = primary_readout_cycle
self.monitored_readout_cycle = monitored_readout_cycle
self.positions = np.array([self.start, self.stop], dtype=float)
# We pass on the arguments as "additional_scan_parameters" in the scan info
@@ -78,7 +78,7 @@ class XasSimpleScan(ScanBase):
positions=self.positions,
scan_time=scan_time,
scan_duration=scan_duration,
primary_readout_cycle=primary_readout_cycle,
monitored_readout_cycle=monitored_readout_cycle,
)
self.actions.set_device_readout_priority([self.daq], priority="async")
@@ -142,7 +142,7 @@ class XasSimpleScan(ScanBase):
Logic to be executed at each acquisition point during the scan.
"""
self.actions.read_monitored_devices()
time.sleep(self.primary_readout_cycle)
time.sleep(self.monitored_readout_cycle)
@scan_hook
def post_scan(self):
@@ -178,7 +178,7 @@ class XasSimpleScanWithXrd(XasSimpleScan):
scan_name = "xas_simple_scan_with_xrd"
gui_config = {
"Movement Parameters": ["start", "stop"],
"Scan Parameters": ["scan_time", "scan_duration", "primary_readout_cycle"],
"Scan Parameters": ["scan_time", "scan_duration", "monitored_readout_cycle"],
"Low Energy Break": ["break_enable_low", "break_time_low", "cycle_low"],
"High Energy Break": ["break_enable_high", "break_time_high", "cycle_high"],
"XRD Triggers": ["exp_time", "n_of_trigger"],
@@ -201,7 +201,7 @@ class XasSimpleScanWithXrd(XasSimpleScan):
n_of_trigger: Annotated[int, ScanArgument(display_name="Number Of Trigger", description="Amount of triggers fired during a break.", ge=0)],
motor: Annotated[DeviceBase | None, ScanArgument(display_name="Motor", description="Bragg motor device.")] = None,
daq: Annotated[DeviceBase | None, ScanArgument(display_name="DAQ", description="NIDAQ device.")] = None,
primary_readout_cycle: Annotated[float, ScanArgument(display_name="Primary Readout Cycle", description="Delay between monitored readouts.", units=Units.s, gt=0)] = 1,
monitored_readout_cycle: Annotated[float, ScanArgument(display_name="Monitored Readout Cycle", description="Delay between monitored readouts.", units=Units.s, gt=0)] = 1,
**kwargs,
#fmt: on
):
@@ -212,7 +212,7 @@ class XasSimpleScanWithXrd(XasSimpleScan):
scan_duration=scan_duration,
motor=motor,
daq=daq,
primary_readout_cycle=primary_readout_cycle,
monitored_readout_cycle=monitored_readout_cycle,
**kwargs,
)
@@ -233,7 +233,7 @@ class XasAdvancedScan(XasSimpleScan):
scan_name = "xas_advanced_scan"
gui_config = {
"Movement Parameters": ["start", "stop"],
"Scan Parameters": ["scan_time", "scan_duration", "primary_readout_cycle"],
"Scan Parameters": ["scan_time", "scan_duration", "monitored_readout_cycle"],
"Spline Parameters": ["p_kink", "e_kink"],
}
@@ -248,7 +248,7 @@ class XasAdvancedScan(XasSimpleScan):
e_kink: Annotated[float, ScanArgument(display_name="E Kink", description="Energy of the kink.", units=Units.eV)],
motor: Annotated[DeviceBase | None, ScanArgument(display_name="Motor", description="Bragg motor device.")] = None,
daq: Annotated[DeviceBase | None, ScanArgument(display_name="DAQ", description="NIDAQ device.")] = None,
primary_readout_cycle: Annotated[float, ScanArgument(display_name="Primary Readout Cycle", description="Delay between monitored readouts.", units=Units.s, gt=0)] = 1,
monitored_readout_cycle: Annotated[float, ScanArgument(display_name="Monitored Readout Cycle", description="Delay between monitored readouts.", units=Units.s, gt=0)] = 1,
**kwargs,
#fmt: on
):
@@ -259,7 +259,7 @@ class XasAdvancedScan(XasSimpleScan):
scan_duration=scan_duration,
motor=motor,
daq=daq,
primary_readout_cycle=primary_readout_cycle,
monitored_readout_cycle=monitored_readout_cycle,
**kwargs,
)
# We pass on the arguments as "additional_scan_parameters" in the scan info
@@ -270,7 +270,7 @@ class XasAdvancedScanWithXrd(XasAdvancedScan):
scan_name = "xas_advanced_scan_with_xrd"
gui_config = {
"Movement Parameters": ["start", "stop"],
"Scan Parameters": ["scan_time", "scan_duration", "primary_readout_cycle"],
"Scan Parameters": ["scan_time", "scan_duration", "monitored_readout_cycle"],
"Spline Parameters": ["p_kink", "e_kink"],
"Low Energy Break": ["break_enable_low", "break_time_low", "cycle_low"],
"High Energy Break": ["break_enable_high", "break_time_high", "cycle_high"],
@@ -296,7 +296,7 @@ class XasAdvancedScanWithXrd(XasAdvancedScan):
n_of_trigger: Annotated[int, ScanArgument(display_name="Number Of Trigger", description="Amount of triggers fired during a break.", ge=0)],
motor: Annotated[DeviceBase | None, ScanArgument(display_name="Motor", description="Bragg motor device.")] = None,
daq: Annotated[DeviceBase | None, ScanArgument(display_name="DAQ", description="NIDAQ device.")] = None,
primary_readout_cycle: Annotated[float, ScanArgument(display_name="Primary Readout Cycle", description="Delay between monitored readouts.", units=Units.s, gt=0)] = 1,
monitored_readout_cycle: Annotated[float, ScanArgument(display_name="Monitored Readout Cycle", description="Delay between monitored readouts.", units=Units.s, gt=0)] = 1,
**kwargs,
#fmt: on
):
@@ -309,7 +309,7 @@ class XasAdvancedScanWithXrd(XasAdvancedScan):
e_kink=e_kink,
motor=motor,
daq=daq,
primary_readout_cycle=primary_readout_cycle,
monitored_readout_cycle=monitored_readout_cycle,
**kwargs,
)
@@ -24,7 +24,7 @@ def _assemble_xas_simple_scan(v4_scan_assembler, **overrides):
"scan_duration": 10.0,
"motor": "mo1_bragg",
"daq": "nidaq",
"primary_readout_cycle": 1.0,
"monitored_readout_cycle": 1.0,
}
params.update(overrides)
return v4_scan_assembler("xas_simple_scan", **params)