refactor: cleanup, and remove readability
CI for superxas_bec / test (push) Successful in 1m32s
CI for superxas_bec / test (pull_request) Successful in 1m44s

This commit is contained in:
2026-05-28 17:06:56 +02:00
parent d443f228bc
commit c1ba279b98
2 changed files with 68 additions and 62 deletions
+9 -21
View File
@@ -8,29 +8,25 @@ used to ensure that the action is executed completely. This is believed
to allow for a more stable execution of the action."""
import time
from typing import Any, Literal
from typing import Literal
from bec_lib.devicemanager import ScanInfo
from bec_lib.logger import bec_logger
from bec_server.scan_server.scans.scan_base import ScanInfo as ScanServerScanInfo
from ophyd import Component as Cpt
from ophyd import DeviceStatus, Signal, StatusBase
from ophyd.status import SubscriptionStatus, WaitTimeoutError
from ophyd import DeviceStatus, StatusBase
from ophyd.status import WaitTimeoutError
from ophyd_devices import CompareStatus, ProgressSignal, TransitionStatus
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from ophyd_devices.utils.errors import DeviceStopError
from typeguard import typechecked
from superxas_bec.devices.mo1_bragg.mo1_bragg_devices import Mo1BraggPositioner
# pylint: disable=unused-import
from superxas_bec.devices.mo1_bragg.mo1_bragg_enums import (
MoveType,
ScanControlLoadMessage,
ScanControlMode,
ScanControlScanStatus,
TriggerControlMode,
TriggerControlSource,
)
from superxas_bec.devices.mo1_bragg.mo1_bragg_utils import compute_spline
from superxas_bec.devices.utils.utils import fetch_scan_info
@@ -71,9 +67,9 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
self.timeout_for_pvwait = 7.5
self.valid_scan_names = [
"xas_simple_scan",
"xas_simple_scan_with_xrd",
"xas_simple_scan_with_xrd", # Prepared for future, currently not yet available
"xas_advanced_scan",
"xas_advanced_scan_with_xrd",
"xas_advanced_scan_with_xrd", # Prepared for future, currently not yet available
"nidaq_continuous_scan",
]
@@ -341,17 +337,9 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
"""Fetch a scan parameter from v4 metadata, with legacy fallbacks."""
if self.scan_parameters is None:
return None
sources = [
self.scan_parameters.additional_scan_parameters,
getattr(self.scan_parameters, "metadata", {}),
getattr(self.scan_info.msg, "scan_parameters", {}),
]
request_inputs = self.scan_parameters.request_inputs or {}
sources.extend([request_inputs.get("inputs", {}), request_inputs.get("kwargs", {})])
for source in sources:
for name in names:
if isinstance(source, dict) and name in source:
return source[name]
for name in names:
if name in self.scan_parameters.additional_scan_parameters:
return self.scan_parameters.additional_scan_parameters[name]
return None
def _get_start_stop(self):
@@ -419,7 +407,7 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
status = CompareStatus(self.calculator.calc_done, 0)
self.cancel_on_stop(status)
status.wait(self.timeout_for_pvwait)
time.sleep(0.25)
time.sleep(0.25) # needed, otherwise there can be a timing issue with reset/calc_angle.
if mode == "AngleToEnergy":
self.calculator.calc_angle.put(inp)
+59 -41
View File
@@ -37,18 +37,12 @@ class NidaqControl(Device):
energy = Cpt(SetableSignal, value=0, kind=Kind.normal)
smpl_abs = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream sample absorption"
)
smpl_abs = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream sample absorption")
smpl_fluo = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream sample fluorescence"
)
ref_abs = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream reference absorption"
)
cisum = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter sum"
)
ref_abs = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream reference absorption")
cisum = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter sum")
ai0_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 0, MEAN"
@@ -364,7 +358,9 @@ class NidaqControl(Device):
### Control PVs ###
enable_compression = Cpt(EpicsSignal, suffix="NIDAQ-EnableRLE", kind=Kind.config, auto_monitor=True)
enable_compression = Cpt(
EpicsSignal, suffix="NIDAQ-EnableRLE", kind=Kind.config, auto_monitor=True
)
# enable_dead_time_correction = Cpt(EpicsSignal, suffix="NIDAQ-EnableDTC", kind=Kind.config, auto_monitor=True)
kickoff_call = Cpt(EpicsSignal, suffix="NIDAQ-Kickoff", kind=Kind.config)
stage_call = Cpt(EpicsSignal, suffix="NIDAQ-Stage", kind=Kind.config)
@@ -373,13 +369,29 @@ class NidaqControl(Device):
compression_ratio = Cpt(EpicsSignalRO, suffix="NIDAQ-CompressionRatio", kind=Kind.config)
scan_type = Cpt(EpicsSignal, suffix="NIDAQ-ScanType", kind=Kind.config)
scan_type_string = Cpt(EpicsSignal, suffix="NIDAQ-ScanType", kind=Kind.config, string=True)
sampling_rate = Cpt(EpicsSignal, suffix="NIDAQ-SamplingRateRequested", kind=Kind.config, auto_monitor=True)
sampling_rate_string = Cpt(EpicsSignal, suffix="NIDAQ-SamplingRateRequested", kind=Kind.config, string=True, auto_monitor=True)
sampling_rate = Cpt(
EpicsSignal, suffix="NIDAQ-SamplingRateRequested", kind=Kind.config, auto_monitor=True
)
sampling_rate_string = Cpt(
EpicsSignal,
suffix="NIDAQ-SamplingRateRequested",
kind=Kind.config,
string=True,
auto_monitor=True,
)
scan_duration = Cpt(EpicsSignal, suffix="NIDAQ-SamplingDuration", kind=Kind.config)
readout_range = Cpt(EpicsSignal, suffix="NIDAQ-ReadoutRange", kind=Kind.config, auto_monitor=True)
readout_range_string = Cpt(EpicsSignal, suffix="NIDAQ-ReadoutRange", kind=Kind.config, string=True, auto_monitor=True)
encoder_factor = Cpt(EpicsSignal, suffix="NIDAQ-EncoderFactor", kind=Kind.config, auto_monitor=True)
encoder_factor_string = Cpt(EpicsSignal, suffix="NIDAQ-EncoderFactor", kind=Kind.config, string=True, auto_monitor=True)
readout_range = Cpt(
EpicsSignal, suffix="NIDAQ-ReadoutRange", kind=Kind.config, auto_monitor=True
)
readout_range_string = Cpt(
EpicsSignal, suffix="NIDAQ-ReadoutRange", kind=Kind.config, string=True, auto_monitor=True
)
encoder_factor = Cpt(
EpicsSignal, suffix="NIDAQ-EncoderFactor", kind=Kind.config, auto_monitor=True
)
encoder_factor_string = Cpt(
EpicsSignal, suffix="NIDAQ-EncoderFactor", kind=Kind.config, string=True, auto_monitor=True
)
stop_call = Cpt(EpicsSignal, suffix="NIDAQ-Stop", kind=Kind.config)
power = Cpt(EpicsSignal, suffix="NIDAQ-Power", kind=Kind.config)
heartbeat = Cpt(EpicsSignal, suffix="NIDAQ-Heartbeat", kind=Kind.config, auto_monitor=True)
@@ -394,22 +406,38 @@ class NidaqControl(Device):
ref_abs_ln = Cpt(EpicsSignal, suffix="NIDAQ-ref_abs_ln", kind=Kind.config, auto_monitor=True)
smpl_abs_no = Cpt(EpicsSignal, suffix="NIDAQ-smpl_abs_no", kind=Kind.config, auto_monitor=True)
smpl_abs_no_string = Cpt(EpicsSignal, suffix="NIDAQ-smpl_abs_no", kind=Kind.config, string=True, auto_monitor=True)
smpl_abs_no_string = Cpt(
EpicsSignal, suffix="NIDAQ-smpl_abs_no", kind=Kind.config, string=True, auto_monitor=True
)
smpl_abs_de = Cpt(EpicsSignal, suffix="NIDAQ-smpl_abs_de", kind=Kind.config, auto_monitor=True)
smpl_abs_de_string = Cpt(EpicsSignal, suffix="NIDAQ-smpl_abs_de", kind=Kind.config, string=True, auto_monitor=True)
smpl_abs_de_string = Cpt(
EpicsSignal, suffix="NIDAQ-smpl_abs_de", kind=Kind.config, string=True, auto_monitor=True
)
smpl_fluo_no = Cpt(EpicsSignal, suffix="NIDAQ-smpl_fluo_no", kind=Kind.config, auto_monitor=True)
smpl_fluo_no_string = Cpt(EpicsSignal, suffix="NIDAQ-smpl_fluo_no", kind=Kind.config, string=True, auto_monitor=True)
smpl_fluo_no = Cpt(
EpicsSignal, suffix="NIDAQ-smpl_fluo_no", kind=Kind.config, auto_monitor=True
)
smpl_fluo_no_string = Cpt(
EpicsSignal, suffix="NIDAQ-smpl_fluo_no", kind=Kind.config, string=True, auto_monitor=True
)
smpl_fluo_de = Cpt(EpicsSignal, suffix="NIDAQ-smpl_fluo_de", kind=Kind.config, auto_monitor=True)
smpl_fluo_de_string = Cpt(EpicsSignal, suffix="NIDAQ-smpl_fluo_de", kind=Kind.config, string=True, auto_monitor=True)
smpl_fluo_de = Cpt(
EpicsSignal, suffix="NIDAQ-smpl_fluo_de", kind=Kind.config, auto_monitor=True
)
smpl_fluo_de_string = Cpt(
EpicsSignal, suffix="NIDAQ-smpl_fluo_de", kind=Kind.config, string=True, auto_monitor=True
)
ref_abs_no = Cpt(EpicsSignal, suffix="NIDAQ-ref_abs_no", kind=Kind.config, auto_monitor=True)
ref_abs_no_string = Cpt(EpicsSignal, suffix="NIDAQ-ref_abs_no", kind=Kind.config, string=True, auto_monitor=True)
ref_abs_no_string = Cpt(
EpicsSignal, suffix="NIDAQ-ref_abs_no", kind=Kind.config, string=True, auto_monitor=True
)
ref_abs_de = Cpt(EpicsSignal, suffix="NIDAQ-ref_abs_de", kind=Kind.config, auto_monitor=True)
ref_abs_de_string = Cpt(EpicsSignal, suffix="NIDAQ-ref_abs_de", kind=Kind.config, string=True, auto_monitor=True)
ref_abs_de_string = Cpt(
EpicsSignal, suffix="NIDAQ-ref_abs_de", kind=Kind.config, string=True, auto_monitor=True
)
class Nidaq(PSIDeviceBase, NidaqControl):
@@ -429,12 +457,14 @@ class Nidaq(PSIDeviceBase, NidaqControl):
super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs)
self.scan_parameters: ScanServerScanInfo | None = None
self.timeout_wait_for_signal = 5 # put 5s firsts
self._timeout_wait_for_pv = 5 # 5s timeout for pv calls. editted due to timeout issues persisting
self._timeout_wait_for_pv = (
5 # 5s timeout for pv calls. editted due to timeout issues persisting
)
self.valid_scan_names = [
"xas_simple_scan",
"xas_simple_scan_with_xrd",
"xas_simple_scan_with_xrd", # prepared for future, currently not yet available
"xas_advanced_scan",
"xas_advanced_scan_with_xrd",
"xas_advanced_scan_with_xrd", # prepared for future, currently not yet available
"nidaq_continuous_scan",
]
@@ -705,20 +735,8 @@ class Nidaq(PSIDeviceBase, NidaqControl):
"""Fetch a scan parameter from v4 metadata, with legacy fallbacks."""
if self.scan_parameters is None:
return None
sources = [
self.scan_parameters.additional_scan_parameters,
getattr(self.scan_info.msg, "scan_parameters", {}),
]
request_inputs = self.scan_parameters.request_inputs or {}
sources.extend(
[
request_inputs.get("inputs", {}),
request_inputs.get("kwargs", {}),
]
)
for source in sources:
if isinstance(source, dict) and name in source:
return source[name]
if name in self.scan_parameters.additional_scan_parameters:
return self.scan_parameters.additional_scan_parameters[name]
return None
def _progress_update(self, value, **kwargs) -> None: