From 61c36903763324ee1bd38267eeb6bd5f20d267d5 Mon Sep 17 00:00:00 2001 From: appel_c Date: Sat, 22 Mar 2025 11:28:43 +0100 Subject: [PATCH] refactor: cleanup and formatter on exafs_scan --- superxas_bec/scans/exafs_scan.py | 96 +++++++++++---------- tests/tests_devices/test_devices_falcon.py | 2 +- tests/tests_devices/test_devices_trigger.py | 1 + 3 files changed, 51 insertions(+), 48 deletions(-) diff --git a/superxas_bec/scans/exafs_scan.py b/superxas_bec/scans/exafs_scan.py index 0ec7ca4..ff70d64 100644 --- a/superxas_bec/scans/exafs_scan.py +++ b/superxas_bec/scans/exafs_scan.py @@ -1,25 +1,26 @@ -from bec_lib.device import DeviceBase -from bec_server.scan_server.scans import ScanBase -from bec_lib.logger import bec_logger import time -import numpy as np -logger = bec_logger.logger +import numpy as np +from bec_lib.device import DeviceBase +from bec_lib.logger import bec_logger +from bec_server.scan_server.scans import ScanBase + +logger = bec_logger.logger class EXAFSScan(ScanBase): scan_name = "exafs_scan" def __init__( - self, - edge_energy : float, - xas_rel_range : list[float] | np.ndarray[float] | None = None, - n_points : list[int] | np.ndarray[int] | None = None, - k_step : list[bool] | np.ndarray[bool] | None = None, - integ_time : list[float] | np.ndarray[float] | None = None, - motor: DeviceBase | None = None, - settling_time: float = 0.2, - **kwargs, + self, + edge_energy: float, + xas_rel_range: list[float] | np.ndarray[float] | None = None, + n_points: list[int] | np.ndarray[int] | None = None, + k_step: list[bool] | np.ndarray[bool] | None = None, + integ_time: list[float] | np.ndarray[float] | None = None, + motor: DeviceBase | None = None, + settling_time: float = 0.2, + **kwargs, ): """ EXAFS Scan of the mono_energy axix @@ -39,67 +40,72 @@ class EXAFSScan(ScanBase): self._check_and_upated_input_arguments() if motor is None: default_motor = "kb_slit_y" - motor = default_motor # TODO Remove that motor, put energy of mono + motor = default_motor # TODO Remove that motor, put energy of mono self.motor = motor super().__init__(exp_time=0, relative=False, settling_time=settling_time, **kwargs) # Check that trigger device is enabled _dev_trigger_name = "trigger" - self._dev_trigger:DeviceBase = self.device_manager.devices.get(_dev_trigger_name, None) + self._dev_trigger: DeviceBase = self.device_manager.devices.get(_dev_trigger_name, None) if self._dev_trigger is None or self._dev_trigger.enabled == False: - raise ValueError(f"Trigger device not found or not enabled in devicemanager {self._dev_trigger}") - #update scan parameters - self.scan_parameters['edge_energy'] = self.edge_energy - self.scan_parameters['xas_rel_range'] = self.xas_rel_range - self.scan_parameters['n_points'] = self.n_points - self.scan_parameters['k_step'] = self.k_step - self.scan_parameters['integ_time'] = self.integ_time - #update readout_priority - self.readout_priority = {"monitored" : [self.motor]} + raise ValueError( + f"Trigger device not found or not enabled in devicemanager {self._dev_trigger}" + ) + # update scan parameters + self.scan_parameters["edge_energy"] = self.edge_energy + self.scan_parameters["xas_rel_range"] = self.xas_rel_range + self.scan_parameters["n_points"] = self.n_points + self.scan_parameters["k_step"] = self.k_step + self.scan_parameters["integ_time"] = self.integ_time + # update readout_priority + self.readout_priority = {"monitored": [self.motor]} def update_scan_motors(self): self.scan_motors = [self.motor] - def _check_and_upated_input_arguments(self) -> None: - """ - If any of xas_rel_range, n_points, k_step or integ_time is None, + """ + If any of xas_rel_range, n_points, k_step or integ_time is None, this method will compute to a default behaviour for the value. """ if not all( [ - len(self.n_points) == len(self.k_step), + len(self.n_points) == len(self.k_step), len(self.n_points) == len(self.integ_time), - len(self.n_points) == (len(self.xas_rel_range) - 1) ,# carefule -1 + len(self.n_points) == (len(self.xas_rel_range) - 1), # carefule -1 ] - ): - raise ValueError("Wrong length for bla") #TODO add better error handling + ): + raise ValueError("Wrong length for bla") # TODO add better error handling self.integ_time = np.repeat(np.array(self.integ_time), np.array(self.n_points)) def _set_position_offset(self): - """ Do not set offset""" + """Do not set offset""" yield None - + def _calculate_positions(self): positions = [] for ii, pnts in enumerate(self.n_points): if self.k_step[ii] is False: - positions.extend(np.linspace(self.xas_rel_range[ii], self.xas_rel_range[ii+1], pnts, endpoint=False).tolist()) + positions.extend( + np.linspace( + self.xas_rel_range[ii], self.xas_rel_range[ii + 1], pnts, endpoint=False + ).tolist() + ) else: - k_start = np.sqrt(self.xas_rel_range[ii]/self.k_step_conversion) - k_stop = np.sqrt(self.xas_rel_range[ii+1]/self.k_step_conversion) + k_start = np.sqrt(self.xas_rel_range[ii] / self.k_step_conversion) + k_stop = np.sqrt(self.xas_rel_range[ii + 1] / self.k_step_conversion) k_pos = np.linspace(k_start, k_stop, pnts, endpoint=False) k_pos = k_pos**2 * self.k_step_conversion positions.extend(k_pos.tolist()) - + # Create positions array - self.positions = np.vstack(positions) + self.positions = np.vstack(positions) # shift by edge energy self.positions = self.positions + self.edge_energy # Convert to keV - self.positions = self.positions/1e3 - + self.positions = self.positions / 1e3 + def _at_each_point(self, ind=None, pos=None): yield from self._move_scan_motors_and_wait(pos) time.sleep(self.settling_time) @@ -109,12 +115,8 @@ class EXAFSScan(ScanBase): # Trigger yield from self.stubs.trigger(min_wait=trigger_time) - #Readout all monitored devices + # Readout all monitored devices yield from self.stubs.read(group="monitored", point_id=self.point_id) - #Increase point id + # Increase point id self.point_id += 1 - - - - \ No newline at end of file diff --git a/tests/tests_devices/test_devices_falcon.py b/tests/tests_devices/test_devices_falcon.py index 8febb10..edea7f0 100644 --- a/tests/tests_devices/test_devices_falcon.py +++ b/tests/tests_devices/test_devices_falcon.py @@ -5,12 +5,12 @@ from unittest import mock import ophyd import pytest -from ophyd import DeviceStatus from ophyd_devices.tests.utils import MockPV, patch_dual_pvs from superxas_bec.devices.falcon import FalconAcquiringStatus, FalconSuperXAS # pylint: disable=protected-access +# pylint: disable=redefined-outer-name @pytest.fixture(scope="function") diff --git a/tests/tests_devices/test_devices_trigger.py b/tests/tests_devices/test_devices_trigger.py index a8ab305..6da34ea 100644 --- a/tests/tests_devices/test_devices_trigger.py +++ b/tests/tests_devices/test_devices_trigger.py @@ -12,6 +12,7 @@ from ophyd_devices.tests.utils import MockPV, patch_dual_pvs from superxas_bec.devices.trigger import ContinuousSamplingMode, Trigger # pylint: disable=protected-access +# pylint: disable=redefined-outer-name @pytest.fixture(scope="function")