diff --git a/superxas_bec/scans/__init__.py b/superxas_bec/scans/__init__.py index e69de29..95eb5ae 100644 --- a/superxas_bec/scans/__init__.py +++ b/superxas_bec/scans/__init__.py @@ -0,0 +1 @@ +from .exafs_scan import EXAFSScan \ No newline at end of file diff --git a/superxas_bec/scans/exafs_scan.py b/superxas_bec/scans/exafs_scan.py new file mode 100644 index 0000000..0ec7ca4 --- /dev/null +++ b/superxas_bec/scans/exafs_scan.py @@ -0,0 +1,120 @@ +from bec_lib.device import DeviceBase +from bec_server.scan_server.scans import ScanBase +from bec_lib.logger import bec_logger +import time +import numpy as np + +logger = bec_logger.logger + + +class EXAFSScan(ScanBase): + scan_name = "exafs_scan" + + def __init__( + self, + edge_energy : float, + xas_rel_range : list[float] | np.ndarray[float] | None = None, + n_points : list[int] | np.ndarray[int] | None = None, + k_step : list[bool] | np.ndarray[bool] | None = None, + integ_time : list[float] | np.ndarray[float] | None = None, + motor: DeviceBase | None = None, + settling_time: float = 0.2, + **kwargs, + ): + """ + EXAFS Scan of the mono_energy axix + + Args: + edge_energy (float) : Adsorption Edge Energy + xas_rel_range (list[float] | np.ndarray[int] | None) : Optinoal, relative range for XAS, Length of list must n_points +1 + n_points (list[int] | np.ndarray[bool] | None) : Optional, number of points per range + ...#TODO docstring + """ + self.edge_energy = edge_energy + self.xas_rel_range = xas_rel_range + self.n_points = n_points + self.k_step = k_step + self.integ_time = integ_time + self.k_step_conversion = 3.81 + self._check_and_upated_input_arguments() + if motor is None: + default_motor = "kb_slit_y" + motor = default_motor # TODO Remove that motor, put energy of mono + self.motor = motor + super().__init__(exp_time=0, relative=False, settling_time=settling_time, **kwargs) + + # Check that trigger device is enabled + _dev_trigger_name = "trigger" + self._dev_trigger:DeviceBase = self.device_manager.devices.get(_dev_trigger_name, None) + if self._dev_trigger is None or self._dev_trigger.enabled == False: + raise ValueError(f"Trigger device not found or not enabled in devicemanager {self._dev_trigger}") + #update scan parameters + self.scan_parameters['edge_energy'] = self.edge_energy + self.scan_parameters['xas_rel_range'] = self.xas_rel_range + self.scan_parameters['n_points'] = self.n_points + self.scan_parameters['k_step'] = self.k_step + self.scan_parameters['integ_time'] = self.integ_time + #update readout_priority + self.readout_priority = {"monitored" : [self.motor]} + + def update_scan_motors(self): + self.scan_motors = [self.motor] + + + def _check_and_upated_input_arguments(self) -> None: + """ + If any of xas_rel_range, n_points, k_step or integ_time is None, + this method will compute to a default behaviour for the value. + """ + + if not all( + [ + len(self.n_points) == len(self.k_step), + len(self.n_points) == len(self.integ_time), + len(self.n_points) == (len(self.xas_rel_range) - 1) ,# carefule -1 + ] + ): + raise ValueError("Wrong length for bla") #TODO add better error handling + self.integ_time = np.repeat(np.array(self.integ_time), np.array(self.n_points)) + + def _set_position_offset(self): + """ Do not set offset""" + yield None + + def _calculate_positions(self): + positions = [] + for ii, pnts in enumerate(self.n_points): + if self.k_step[ii] is False: + positions.extend(np.linspace(self.xas_rel_range[ii], self.xas_rel_range[ii+1], pnts, endpoint=False).tolist()) + else: + k_start = np.sqrt(self.xas_rel_range[ii]/self.k_step_conversion) + k_stop = np.sqrt(self.xas_rel_range[ii+1]/self.k_step_conversion) + k_pos = np.linspace(k_start, k_stop, pnts, endpoint=False) + k_pos = k_pos**2 * self.k_step_conversion + positions.extend(k_pos.tolist()) + + # Create positions array + self.positions = np.vstack(positions) + # shift by edge energy + self.positions = self.positions + self.edge_energy + # Convert to keV + self.positions = self.positions/1e3 + + def _at_each_point(self, ind=None, pos=None): + yield from self._move_scan_motors_and_wait(pos) + time.sleep(self.settling_time) + trigger_time = self.integ_time[ind] + self.stubs.send_rpc_and_wait(self._dev_trigger, "set_exposure_time", trigger_time) + + # Trigger + yield from self.stubs.trigger(min_wait=trigger_time) + + #Readout all monitored devices + yield from self.stubs.read(group="monitored", point_id=self.point_id) + + #Increase point id + self.point_id += 1 + + + + \ No newline at end of file