From c1ba279b985cb1918bafddffbc990bee6386fa4f Mon Sep 17 00:00:00 2001 From: appel_c Date: Thu, 28 May 2026 17:06:56 +0200 Subject: [PATCH] refactor: cleanup, and remove readability --- superxas_bec/devices/mo1_bragg/mo1_bragg.py | 30 ++---- superxas_bec/devices/nidaq/nidaq.py | 100 ++++++++++++-------- 2 files changed, 68 insertions(+), 62 deletions(-) diff --git a/superxas_bec/devices/mo1_bragg/mo1_bragg.py b/superxas_bec/devices/mo1_bragg/mo1_bragg.py index d50ef91..ee5233c 100644 --- a/superxas_bec/devices/mo1_bragg/mo1_bragg.py +++ b/superxas_bec/devices/mo1_bragg/mo1_bragg.py @@ -8,29 +8,25 @@ used to ensure that the action is executed completely. This is believed to allow for a more stable execution of the action.""" import time -from typing import Any, Literal +from typing import Literal from bec_lib.devicemanager import ScanInfo from bec_lib.logger import bec_logger from bec_server.scan_server.scans.scan_base import ScanInfo as ScanServerScanInfo from ophyd import Component as Cpt -from ophyd import DeviceStatus, Signal, StatusBase -from ophyd.status import SubscriptionStatus, WaitTimeoutError +from ophyd import DeviceStatus, StatusBase +from ophyd.status import WaitTimeoutError from ophyd_devices import CompareStatus, ProgressSignal, TransitionStatus from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase -from ophyd_devices.utils.errors import DeviceStopError from typeguard import typechecked from superxas_bec.devices.mo1_bragg.mo1_bragg_devices import Mo1BraggPositioner # pylint: disable=unused-import from superxas_bec.devices.mo1_bragg.mo1_bragg_enums import ( - MoveType, ScanControlLoadMessage, ScanControlMode, ScanControlScanStatus, - TriggerControlMode, - TriggerControlSource, ) from superxas_bec.devices.mo1_bragg.mo1_bragg_utils import compute_spline from superxas_bec.devices.utils.utils import fetch_scan_info @@ -71,9 +67,9 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner): self.timeout_for_pvwait = 7.5 self.valid_scan_names = [ "xas_simple_scan", - "xas_simple_scan_with_xrd", + "xas_simple_scan_with_xrd", # Prepared for future, currently not yet available "xas_advanced_scan", - "xas_advanced_scan_with_xrd", + "xas_advanced_scan_with_xrd", # Prepared for future, currently not yet available "nidaq_continuous_scan", ] @@ -341,17 +337,9 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner): """Fetch a scan parameter from v4 metadata, with legacy fallbacks.""" if self.scan_parameters is None: return None - sources = [ - self.scan_parameters.additional_scan_parameters, - getattr(self.scan_parameters, "metadata", {}), - getattr(self.scan_info.msg, "scan_parameters", {}), - ] - request_inputs = self.scan_parameters.request_inputs or {} - sources.extend([request_inputs.get("inputs", {}), request_inputs.get("kwargs", {})]) - for source in sources: - for name in names: - if isinstance(source, dict) and name in source: - return source[name] + for name in names: + if name in self.scan_parameters.additional_scan_parameters: + return self.scan_parameters.additional_scan_parameters[name] return None def _get_start_stop(self): @@ -419,7 +407,7 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner): status = CompareStatus(self.calculator.calc_done, 0) self.cancel_on_stop(status) status.wait(self.timeout_for_pvwait) - time.sleep(0.25) + time.sleep(0.25) # needed, otherwise there can be a timing issue with reset/calc_angle. if mode == "AngleToEnergy": self.calculator.calc_angle.put(inp) diff --git a/superxas_bec/devices/nidaq/nidaq.py b/superxas_bec/devices/nidaq/nidaq.py index 3ac1664..9216f7f 100644 --- a/superxas_bec/devices/nidaq/nidaq.py +++ b/superxas_bec/devices/nidaq/nidaq.py @@ -37,18 +37,12 @@ class NidaqControl(Device): energy = Cpt(SetableSignal, value=0, kind=Kind.normal) - smpl_abs = Cpt( - SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream sample absorption" - ) + smpl_abs = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream sample absorption") smpl_fluo = Cpt( SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream sample fluorescence" ) - ref_abs = Cpt( - SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream reference absorption" - ) - cisum = Cpt( - SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter sum" - ) + ref_abs = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream reference absorption") + cisum = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter sum") ai0_mean = Cpt( SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 0, MEAN" @@ -364,7 +358,9 @@ class NidaqControl(Device): ### Control PVs ### - enable_compression = Cpt(EpicsSignal, suffix="NIDAQ-EnableRLE", kind=Kind.config, auto_monitor=True) + enable_compression = Cpt( + EpicsSignal, suffix="NIDAQ-EnableRLE", kind=Kind.config, auto_monitor=True + ) # enable_dead_time_correction = Cpt(EpicsSignal, suffix="NIDAQ-EnableDTC", kind=Kind.config, auto_monitor=True) kickoff_call = Cpt(EpicsSignal, suffix="NIDAQ-Kickoff", kind=Kind.config) stage_call = Cpt(EpicsSignal, suffix="NIDAQ-Stage", kind=Kind.config) @@ -373,13 +369,29 @@ class NidaqControl(Device): compression_ratio = Cpt(EpicsSignalRO, suffix="NIDAQ-CompressionRatio", kind=Kind.config) scan_type = Cpt(EpicsSignal, suffix="NIDAQ-ScanType", kind=Kind.config) scan_type_string = Cpt(EpicsSignal, suffix="NIDAQ-ScanType", kind=Kind.config, string=True) - sampling_rate = Cpt(EpicsSignal, suffix="NIDAQ-SamplingRateRequested", kind=Kind.config, auto_monitor=True) - sampling_rate_string = Cpt(EpicsSignal, suffix="NIDAQ-SamplingRateRequested", kind=Kind.config, string=True, auto_monitor=True) + sampling_rate = Cpt( + EpicsSignal, suffix="NIDAQ-SamplingRateRequested", kind=Kind.config, auto_monitor=True + ) + sampling_rate_string = Cpt( + EpicsSignal, + suffix="NIDAQ-SamplingRateRequested", + kind=Kind.config, + string=True, + auto_monitor=True, + ) scan_duration = Cpt(EpicsSignal, suffix="NIDAQ-SamplingDuration", kind=Kind.config) - readout_range = Cpt(EpicsSignal, suffix="NIDAQ-ReadoutRange", kind=Kind.config, auto_monitor=True) - readout_range_string = Cpt(EpicsSignal, suffix="NIDAQ-ReadoutRange", kind=Kind.config, string=True, auto_monitor=True) - encoder_factor = Cpt(EpicsSignal, suffix="NIDAQ-EncoderFactor", kind=Kind.config, auto_monitor=True) - encoder_factor_string = Cpt(EpicsSignal, suffix="NIDAQ-EncoderFactor", kind=Kind.config, string=True, auto_monitor=True) + readout_range = Cpt( + EpicsSignal, suffix="NIDAQ-ReadoutRange", kind=Kind.config, auto_monitor=True + ) + readout_range_string = Cpt( + EpicsSignal, suffix="NIDAQ-ReadoutRange", kind=Kind.config, string=True, auto_monitor=True + ) + encoder_factor = Cpt( + EpicsSignal, suffix="NIDAQ-EncoderFactor", kind=Kind.config, auto_monitor=True + ) + encoder_factor_string = Cpt( + EpicsSignal, suffix="NIDAQ-EncoderFactor", kind=Kind.config, string=True, auto_monitor=True + ) stop_call = Cpt(EpicsSignal, suffix="NIDAQ-Stop", kind=Kind.config) power = Cpt(EpicsSignal, suffix="NIDAQ-Power", kind=Kind.config) heartbeat = Cpt(EpicsSignal, suffix="NIDAQ-Heartbeat", kind=Kind.config, auto_monitor=True) @@ -394,22 +406,38 @@ class NidaqControl(Device): ref_abs_ln = Cpt(EpicsSignal, suffix="NIDAQ-ref_abs_ln", kind=Kind.config, auto_monitor=True) smpl_abs_no = Cpt(EpicsSignal, suffix="NIDAQ-smpl_abs_no", kind=Kind.config, auto_monitor=True) - smpl_abs_no_string = Cpt(EpicsSignal, suffix="NIDAQ-smpl_abs_no", kind=Kind.config, string=True, auto_monitor=True) + smpl_abs_no_string = Cpt( + EpicsSignal, suffix="NIDAQ-smpl_abs_no", kind=Kind.config, string=True, auto_monitor=True + ) smpl_abs_de = Cpt(EpicsSignal, suffix="NIDAQ-smpl_abs_de", kind=Kind.config, auto_monitor=True) - smpl_abs_de_string = Cpt(EpicsSignal, suffix="NIDAQ-smpl_abs_de", kind=Kind.config, string=True, auto_monitor=True) + smpl_abs_de_string = Cpt( + EpicsSignal, suffix="NIDAQ-smpl_abs_de", kind=Kind.config, string=True, auto_monitor=True + ) - smpl_fluo_no = Cpt(EpicsSignal, suffix="NIDAQ-smpl_fluo_no", kind=Kind.config, auto_monitor=True) - smpl_fluo_no_string = Cpt(EpicsSignal, suffix="NIDAQ-smpl_fluo_no", kind=Kind.config, string=True, auto_monitor=True) + smpl_fluo_no = Cpt( + EpicsSignal, suffix="NIDAQ-smpl_fluo_no", kind=Kind.config, auto_monitor=True + ) + smpl_fluo_no_string = Cpt( + EpicsSignal, suffix="NIDAQ-smpl_fluo_no", kind=Kind.config, string=True, auto_monitor=True + ) - smpl_fluo_de = Cpt(EpicsSignal, suffix="NIDAQ-smpl_fluo_de", kind=Kind.config, auto_monitor=True) - smpl_fluo_de_string = Cpt(EpicsSignal, suffix="NIDAQ-smpl_fluo_de", kind=Kind.config, string=True, auto_monitor=True) + smpl_fluo_de = Cpt( + EpicsSignal, suffix="NIDAQ-smpl_fluo_de", kind=Kind.config, auto_monitor=True + ) + smpl_fluo_de_string = Cpt( + EpicsSignal, suffix="NIDAQ-smpl_fluo_de", kind=Kind.config, string=True, auto_monitor=True + ) ref_abs_no = Cpt(EpicsSignal, suffix="NIDAQ-ref_abs_no", kind=Kind.config, auto_monitor=True) - ref_abs_no_string = Cpt(EpicsSignal, suffix="NIDAQ-ref_abs_no", kind=Kind.config, string=True, auto_monitor=True) + ref_abs_no_string = Cpt( + EpicsSignal, suffix="NIDAQ-ref_abs_no", kind=Kind.config, string=True, auto_monitor=True + ) ref_abs_de = Cpt(EpicsSignal, suffix="NIDAQ-ref_abs_de", kind=Kind.config, auto_monitor=True) - ref_abs_de_string = Cpt(EpicsSignal, suffix="NIDAQ-ref_abs_de", kind=Kind.config, string=True, auto_monitor=True) + ref_abs_de_string = Cpt( + EpicsSignal, suffix="NIDAQ-ref_abs_de", kind=Kind.config, string=True, auto_monitor=True + ) class Nidaq(PSIDeviceBase, NidaqControl): @@ -429,12 +457,14 @@ class Nidaq(PSIDeviceBase, NidaqControl): super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs) self.scan_parameters: ScanServerScanInfo | None = None self.timeout_wait_for_signal = 5 # put 5s firsts - self._timeout_wait_for_pv = 5 # 5s timeout for pv calls. editted due to timeout issues persisting + self._timeout_wait_for_pv = ( + 5 # 5s timeout for pv calls. editted due to timeout issues persisting + ) self.valid_scan_names = [ "xas_simple_scan", - "xas_simple_scan_with_xrd", + "xas_simple_scan_with_xrd", # prepared for future, currently not yet available "xas_advanced_scan", - "xas_advanced_scan_with_xrd", + "xas_advanced_scan_with_xrd", # prepared for future, currently not yet available "nidaq_continuous_scan", ] @@ -705,20 +735,8 @@ class Nidaq(PSIDeviceBase, NidaqControl): """Fetch a scan parameter from v4 metadata, with legacy fallbacks.""" if self.scan_parameters is None: return None - sources = [ - self.scan_parameters.additional_scan_parameters, - getattr(self.scan_info.msg, "scan_parameters", {}), - ] - request_inputs = self.scan_parameters.request_inputs or {} - sources.extend( - [ - request_inputs.get("inputs", {}), - request_inputs.get("kwargs", {}), - ] - ) - for source in sources: - if isinstance(source, dict) and name in source: - return source[name] + if name in self.scan_parameters.additional_scan_parameters: + return self.scan_parameters.additional_scan_parameters[name] return None def _progress_update(self, value, **kwargs) -> None: