feat: add exafs with k step scan

This commit is contained in:
gac-x10da
2025-03-20 16:41:55 +01:00
committed by appel_c
parent fda08595c5
commit 205f513fe0
2 changed files with 121 additions and 0 deletions

View File

@@ -0,0 +1 @@
from .exafs_scan import EXAFSScan

View File

@@ -0,0 +1,120 @@
from bec_lib.device import DeviceBase
from bec_server.scan_server.scans import ScanBase
from bec_lib.logger import bec_logger
import time
import numpy as np
logger = bec_logger.logger
class EXAFSScan(ScanBase):
scan_name = "exafs_scan"
def __init__(
self,
edge_energy : float,
xas_rel_range : list[float] | np.ndarray[float] | None = None,
n_points : list[int] | np.ndarray[int] | None = None,
k_step : list[bool] | np.ndarray[bool] | None = None,
integ_time : list[float] | np.ndarray[float] | None = None,
motor: DeviceBase | None = None,
settling_time: float = 0.2,
**kwargs,
):
"""
EXAFS Scan of the mono_energy axix
Args:
edge_energy (float) : Adsorption Edge Energy
xas_rel_range (list[float] | np.ndarray[int] | None) : Optinoal, relative range for XAS, Length of list must n_points +1
n_points (list[int] | np.ndarray[bool] | None) : Optional, number of points per range
...#TODO docstring
"""
self.edge_energy = edge_energy
self.xas_rel_range = xas_rel_range
self.n_points = n_points
self.k_step = k_step
self.integ_time = integ_time
self.k_step_conversion = 3.81
self._check_and_upated_input_arguments()
if motor is None:
default_motor = "kb_slit_y"
motor = default_motor # TODO Remove that motor, put energy of mono
self.motor = motor
super().__init__(exp_time=0, relative=False, settling_time=settling_time, **kwargs)
# Check that trigger device is enabled
_dev_trigger_name = "trigger"
self._dev_trigger:DeviceBase = self.device_manager.devices.get(_dev_trigger_name, None)
if self._dev_trigger is None or self._dev_trigger.enabled == False:
raise ValueError(f"Trigger device not found or not enabled in devicemanager {self._dev_trigger}")
#update scan parameters
self.scan_parameters['edge_energy'] = self.edge_energy
self.scan_parameters['xas_rel_range'] = self.xas_rel_range
self.scan_parameters['n_points'] = self.n_points
self.scan_parameters['k_step'] = self.k_step
self.scan_parameters['integ_time'] = self.integ_time
#update readout_priority
self.readout_priority = {"monitored" : [self.motor]}
def update_scan_motors(self):
self.scan_motors = [self.motor]
def _check_and_upated_input_arguments(self) -> None:
"""
If any of xas_rel_range, n_points, k_step or integ_time is None,
this method will compute to a default behaviour for the value.
"""
if not all(
[
len(self.n_points) == len(self.k_step),
len(self.n_points) == len(self.integ_time),
len(self.n_points) == (len(self.xas_rel_range) - 1) ,# carefule -1
]
):
raise ValueError("Wrong length for bla") #TODO add better error handling
self.integ_time = np.repeat(np.array(self.integ_time), np.array(self.n_points))
def _set_position_offset(self):
""" Do not set offset"""
yield None
def _calculate_positions(self):
positions = []
for ii, pnts in enumerate(self.n_points):
if self.k_step[ii] is False:
positions.extend(np.linspace(self.xas_rel_range[ii], self.xas_rel_range[ii+1], pnts, endpoint=False).tolist())
else:
k_start = np.sqrt(self.xas_rel_range[ii]/self.k_step_conversion)
k_stop = np.sqrt(self.xas_rel_range[ii+1]/self.k_step_conversion)
k_pos = np.linspace(k_start, k_stop, pnts, endpoint=False)
k_pos = k_pos**2 * self.k_step_conversion
positions.extend(k_pos.tolist())
# Create positions array
self.positions = np.vstack(positions)
# shift by edge energy
self.positions = self.positions + self.edge_energy
# Convert to keV
self.positions = self.positions/1e3
def _at_each_point(self, ind=None, pos=None):
yield from self._move_scan_motors_and_wait(pos)
time.sleep(self.settling_time)
trigger_time = self.integ_time[ind]
self.stubs.send_rpc_and_wait(self._dev_trigger, "set_exposure_time", trigger_time)
# Trigger
yield from self.stubs.trigger(min_wait=trigger_time)
#Readout all monitored devices
yield from self.stubs.read(group="monitored", point_id=self.point_id)
#Increase point id
self.point_id += 1