refactor: fix formatting

This commit is contained in:
2025-06-20 09:19:51 +02:00
parent b3672cf5f5
commit abf432f2a9
7 changed files with 75 additions and 46 deletions

View File

@@ -16,14 +16,12 @@ from bec_lib.logger import bec_logger
from ophyd import Component as Cpt from ophyd import Component as Cpt
from ophyd import DeviceStatus, Signal, StatusBase from ophyd import DeviceStatus, Signal, StatusBase
from ophyd.status import SubscriptionStatus, WaitTimeoutError from ophyd.status import SubscriptionStatus, WaitTimeoutError
from ophyd_devices import ProgressSignal from ophyd_devices import CompareStatus, ProgressSignal, TransitionStatus
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from ophyd_devices.utils.errors import DeviceStopError from ophyd_devices.utils.errors import DeviceStopError
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from typeguard import typechecked from typeguard import typechecked
from ophyd_devices import TransitionStatus, CompareStatus
from debye_bec.devices.mo1_bragg.mo1_bragg_devices import Mo1BraggPositioner from debye_bec.devices.mo1_bragg.mo1_bragg_devices import Mo1BraggPositioner
# pylint: disable=unused-import # pylint: disable=unused-import
@@ -219,7 +217,10 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
if self.stopped is True: if self.stopped is True:
logger.warning(f"Resetting stopped in unstage for device {self.name}.") logger.warning(f"Resetting stopped in unstage for device {self.name}.")
self._stopped = False self._stopped = False
if self.scan_control.scan_msg.get() in [ScanControlLoadMessage.STARTED, ScanControlLoadMessage.SUCCESS]: if self.scan_control.scan_msg.get() in [
ScanControlLoadMessage.STARTED,
ScanControlLoadMessage.SUCCESS,
]:
status = CompareStatus(self.scan_control.scan_msg, ScanControlLoadMessage.PENDING) status = CompareStatus(self.scan_control.scan_msg, ScanControlLoadMessage.PENDING)
self.cancel_on_stop(status) self.cancel_on_stop(status)
try: try:
@@ -227,8 +228,8 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
return None return None
except WaitTimeoutError: except WaitTimeoutError:
logger.warning( logger.warning(
f"Timeout in on_unstage of {self.name} after {self.timeout_for_pvwait}s, current scan_control_message : {self.scan_control.scan_msg.get()}" f"Timeout in on_unstage of {self.name} after {self.timeout_for_pvwait}s, current scan_control_message : {self.scan_control.scan_msg.get()}"
) )
else: else:
status = CompareStatus(self.scan_control.scan_msg, ScanControlLoadMessage.PENDING) status = CompareStatus(self.scan_control.scan_msg, ScanControlLoadMessage.PENDING)
self.cancel_on_stop(status) self.cancel_on_stop(status)
@@ -258,11 +259,11 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
else self.scan_control.scan_start_timer.put else self.scan_control.scan_start_timer.put
) )
status = TransitionStatus( status = TransitionStatus(
self.scan_control.scan_status, self.scan_control.scan_status,
transitions=[ScanControlScanStatus.READY, ScanControlScanStatus.RUNNING], transitions=[ScanControlScanStatus.READY, ScanControlScanStatus.RUNNING],
strict=True, strict=True,
raise_states=[ScanControlScanStatus.PARAMETER_WRONG], raise_states=[ScanControlScanStatus.PARAMETER_WRONG],
) )
self.cancel_on_stop(status) self.cancel_on_stop(status)
start_func(1) start_func(1)
return status return status

View File

@@ -94,7 +94,8 @@ class Mo1BraggCrystal(Device):
current_xtal_string = Cpt( current_xtal_string = Cpt(
EpicsSignalRO, suffix="current_xtal_ENUM_RBV", kind="normal", auto_monitor=True, string=True EpicsSignalRO, suffix="current_xtal_ENUM_RBV", kind="normal", auto_monitor=True, string=True
) )
class Mo1BraggScanSettings(Device): class Mo1BraggScanSettings(Device):
"""Mo1 Bragg PVs to set the scan setttings""" """Mo1 Bragg PVs to set the scan setttings"""

View File

@@ -7,12 +7,10 @@ from bec_lib.logger import bec_logger
from ophyd import Component as Cpt from ophyd import Component as Cpt
from ophyd import Device, DeviceStatus, EpicsSignal, EpicsSignalRO, Kind, StatusBase from ophyd import Device, DeviceStatus, EpicsSignal, EpicsSignalRO, Kind, StatusBase
from ophyd.status import SubscriptionStatus, WaitTimeoutError from ophyd.status import SubscriptionStatus, WaitTimeoutError
from ophyd_devices import ProgressSignal from ophyd_devices import CompareStatus, ProgressSignal, TransitionStatus
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from ophyd_devices.sim.sim_signals import SetableSignal from ophyd_devices.sim.sim_signals import SetableSignal
from ophyd_devices import CompareStatus, TransitionStatus
from debye_bec.devices.nidaq.nidaq_enums import ( from debye_bec.devices.nidaq.nidaq_enums import (
EncoderFactors, EncoderFactors,
NIDAQCompression, NIDAQCompression,
@@ -502,13 +500,13 @@ class Nidaq(PSIDeviceBase, NidaqControl):
Called after the device is connected and its signals are connected. Called after the device is connected and its signals are connected.
Default values for signals should be set here. Default values for signals should be set here.
""" """
status = TransitionStatus(self.heartbeat, transitions=[0,1], strict=False) status = TransitionStatus(self.heartbeat, transitions=[0, 1], strict=False)
self.cancel_on_stop(status) self.cancel_on_stop(status)
try: try:
status.wait(timeout=self.timeout_wait_for_signal) # Raises if timeout is reached status.wait(timeout=self.timeout_wait_for_signal) # Raises if timeout is reached
except WaitTimeoutError: except WaitTimeoutError:
logger.warning(f"Device {self.name} was not alive, trying to put power on") logger.warning(f"Device {self.name} was not alive, trying to put power on")
status = TransitionStatus(self.heartbeat, transitions=[0,1], strict=False) status = TransitionStatus(self.heartbeat, transitions=[0, 1], strict=False)
self.cancel_on_stop(status) self.cancel_on_stop(status)
self.power.put(1) self.power.put(1)
@@ -589,7 +587,6 @@ class Nidaq(PSIDeviceBase, NidaqControl):
logger.info(f"Device {self.name} ready to be kicked off for nidaq_continuous_scan") logger.info(f"Device {self.name} ready to be kicked off for nidaq_continuous_scan")
return None return None
status = CompareStatus(self.state, NidaqState.KICKOFF) status = CompareStatus(self.state, NidaqState.KICKOFF)
self.cancel_on_stop(status) self.cancel_on_stop(status)
status.wait(timeout=self._timeout_wait_for_pv) status.wait(timeout=self._timeout_wait_for_pv)

View File

@@ -1 +1 @@
from .debye_nexus_structure import DebyeNexusStructure from .debye_nexus_structure import DebyeNexusStructure

View File

@@ -1,10 +1,11 @@
from bec_server.file_writer.default_writer import DefaultFormat from bec_server.file_writer.default_writer import DefaultFormat
class DebyeNexusStructure(DefaultFormat): class DebyeNexusStructure(DefaultFormat):
""" Nexus Structure for Debye""" """Nexus Structure for Debye"""
def format(self) -> None: def format(self) -> None:
""" Specify the file format for the file writer.""" """Specify the file format for the file writer."""
entry = self.storage.create_group(name="entry") entry = self.storage.create_group(name="entry")
entry.attrs["NX_class"] = "NXentry" entry.attrs["NX_class"] = "NXentry"
@@ -17,81 +18,108 @@ class DebyeNexusStructure(DefaultFormat):
# Logic if device exist # Logic if device exist
if "mo1_bragg" in self.device_manager.devices: if "mo1_bragg" in self.device_manager.devices:
monochromator = instrument.create_group(name="monochromator") monochromator = instrument.create_group(name="monochromator")
monochromator.attrs["NX_class"] = "NXmonochromator" monochromator.attrs["NX_class"] = "NXmonochromator"
crystal = monochromator.create_group(name="crystal") crystal = monochromator.create_group(name="crystal")
crystal.attrs["NX_class"] = "NXcrystal" crystal.attrs["NX_class"] = "NXcrystal"
# Create a dataset # Create a dataset
chemical_formular = crystal.create_dataset(name="chemical_formular", data='Si') chemical_formular = crystal.create_dataset(name="chemical_formular", data="Si")
chemical_formular.attrs["NX_class"] = "NX_CHAR" chemical_formular.attrs["NX_class"] = "NX_CHAR"
# Create a softlink # Create a softlink
d_spacing = crystal.create_soft_link(name="d_spacing", target="/entry/collection/devices/mo1_bragg/mo1_bragg_crystal_current_d_spacing/value") d_spacing = crystal.create_soft_link(
name="d_spacing",
target="/entry/collection/devices/mo1_bragg/mo1_bragg_crystal_current_d_spacing/value",
)
d_spacing.attrs["NX_class"] = "NX_FLOAT" d_spacing.attrs["NX_class"] = "NX_FLOAT"
offset = crystal.create_soft_link(name="offset", target="/entry/collection/devices/mo1_bragg/mo1_bragg_crystal_current_offset/value") offset = crystal.create_soft_link(
name="offset",
target="/entry/collection/devices/mo1_bragg/mo1_bragg_crystal_current_offset/value",
)
offset.attrs["NX_class"] = "NX_FLOAT" offset.attrs["NX_class"] = "NX_FLOAT"
reflection = crystal.create_soft_link(name="reflection", target="/entry/collection/devices/mo1_bragg/mo1_bragg_crystal_current_xtal_string/value") reflection = crystal.create_soft_link(
name="reflection",
target="/entry/collection/devices/mo1_bragg/mo1_bragg_crystal_current_xtal_string/value",
)
reflection.attrs["NX_class"] = "NX_CHAR" reflection.attrs["NX_class"] = "NX_CHAR"
################## ##################
## cm mirror specific information ## cm mirror specific information
################### ###################
collimating_mirror = instrument.create_group(name="collimating_mirror") collimating_mirror = instrument.create_group(name="collimating_mirror")
collimating_mirror.attrs["NX_class"] = "NXmirror" collimating_mirror.attrs["NX_class"] = "NXmirror"
cm_substrate_material = collimating_mirror.create_dataset(name="substrate_material", data='Si') cm_substrate_material = collimating_mirror.create_dataset(
name="substrate_material", data="Si"
)
cm_substrate_material.attrs["NX_class"] = "NX_CHAR" cm_substrate_material.attrs["NX_class"] = "NX_CHAR"
cm_bending_radius = collimating_mirror.create_soft_link(name="sagittal radius", target="/entry/collection/devices/cm_bnd_radius/cm_bnd_radius/value") cm_bending_radius = collimating_mirror.create_soft_link(
name="sagittal radius",
target="/entry/collection/devices/cm_bnd_radius/cm_bnd_radius/value",
)
cm_bending_radius.attrs["NX_class"] = "NX_FLOAT" cm_bending_radius.attrs["NX_class"] = "NX_FLOAT"
cm_bending_radius.attrs["units"] = 'km' cm_bending_radius.attrs["units"] = "km"
cm_incidence_angle = collimating_mirror.create_soft_link(name="incidence angle", target="/entry/collection/devices/cm_rotx/cm_rotx/value") cm_incidence_angle = collimating_mirror.create_soft_link(
name="incidence angle", target="/entry/collection/devices/cm_rotx/cm_rotx/value"
)
cm_incidence_angle.attrs["NX_class"] = "NX_FLOAT" cm_incidence_angle.attrs["NX_class"] = "NX_FLOAT"
cm_yaw_angle = collimating_mirror.create_soft_link(name="incident angle", target="/entry/collection/devices/cm_roty/cm_roty/value") cm_yaw_angle = collimating_mirror.create_soft_link(
cm_yaw_angle.attrs["NX_class"] = "NX_FLOAT" name="incident angle", target="/entry/collection/devices/cm_roty/cm_roty/value"
)
cm_yaw_angle.attrs["NX_class"] = "NX_FLOAT"
################## ##################
## fm mirror specific information ## fm mirror specific information
################### ###################
focusing_mirror = instrument.create_group(name="focusing_mirror") focusing_mirror = instrument.create_group(name="focusing_mirror")
focusing_mirror.attrs["NX_class"] = "NXmirror" focusing_mirror.attrs["NX_class"] = "NXmirror"
fm_substrate_material = focusing_mirror.create_dataset(name="substrate_material", data='Si') fm_substrate_material = focusing_mirror.create_dataset(name="substrate_material", data="Si")
fm_substrate_material.attrs["NX_class"] = "NX_CHAR" fm_substrate_material.attrs["NX_class"] = "NX_CHAR"
fm_bending_radius = focusing_mirror.create_soft_link(name="sagittal radius", target="/entry/collection/devices/fm_bnd_radius/fm_bnd_radius/value") fm_bending_radius = focusing_mirror.create_soft_link(
name="sagittal radius",
target="/entry/collection/devices/fm_bnd_radius/fm_bnd_radius/value",
)
fm_bending_radius.attrs["NX_class"] = "NX_FLOAT" fm_bending_radius.attrs["NX_class"] = "NX_FLOAT"
fm_incidence_angle = focusing_mirror.create_soft_link(name="incidence angle", target="/entry/collection/devices/fm_incidence_angle/fm_incidence_angle/value") fm_incidence_angle = focusing_mirror.create_soft_link(
name="incidence angle",
target="/entry/collection/devices/fm_incidence_angle/fm_incidence_angle/value",
)
fm_incidence_angle.attrs["NX_class"] = "NX_FLOAT" fm_incidence_angle.attrs["NX_class"] = "NX_FLOAT"
fm_yaw_angle = focusing_mirror.create_soft_link(name="yaw angle", target="/entry/collection/devices/fm_roty/fm_roty/value") fm_yaw_angle = focusing_mirror.create_soft_link(
name="yaw angle", target="/entry/collection/devices/fm_roty/fm_roty/value"
)
fm_yaw_angle.attrs["NX_class"] = "NX_FLOAT" fm_yaw_angle.attrs["NX_class"] = "NX_FLOAT"
fm_roll_angle = focusing_mirror.create_soft_link(name="roll angle", target="/entry/collection/devices/fm_rotz/fm_rotz/value") fm_roll_angle = focusing_mirror.create_soft_link(
name="roll angle", target="/entry/collection/devices/fm_rotz/fm_rotz/value"
)
fm_roll_angle.attrs["NX_class"] = "NX_FLOAT" fm_roll_angle.attrs["NX_class"] = "NX_FLOAT"
################## ##################
## source specific information ## source specific information
################### ###################
source = instrument.create_group(name="source") source = instrument.create_group(name="source")
source.attrs["NX_class"] = "NXsource" source.attrs["NX_class"] = "NXsource"
beamline_name = source.create_dataset(name="beamline_name", data='Debye') beamline_name = source.create_dataset(name="beamline_name", data="Debye")
beamline_name.attrs["NX_class"] = "NX_CHAR" beamline_name.attrs["NX_class"] = "NX_CHAR"
facility_name = source.create_dataset(name="facility_name", data='Swiss Light Source') facility_name = source.create_dataset(name="facility_name", data="Swiss Light Source")
facility_name.attrs["NX_class"] = "NX_CHAR" facility_name.attrs["NX_class"] = "NX_CHAR"
probe = source.create_dataset(name="probe", data='X-ray') probe = source.create_dataset(name="probe", data="X-ray")
probe.attrs["NX_class"] = "NX_CHAR" probe.attrs["NX_class"] = "NX_CHAR"

View File

@@ -1,6 +1,6 @@
#from .metadata_schema_xas_simple_scan import xas_simple_scan_schema # from .metadata_schema_xas_simple_scan import xas_simple_scan_schema
METADATA_SCHEMA_REGISTRY = {#"xas_simple_scan": xas_simple_scan_schema METADATA_SCHEMA_REGISTRY = { # "xas_simple_scan": xas_simple_scan_schema
# Add models which should be used to validate scan metadata here. # Add models which should be used to validate scan metadata here.
# Make a model according to the template, and import it as above # Make a model according to the template, and import it as above
# Then associate it with a scan like so: # Then associate it with a scan like so:

View File

@@ -1,4 +1,6 @@
from bec_lib.metadata_schema import BasicScanMetadata from bec_lib.metadata_schema import BasicScanMetadata
# #
# #
class xas_simple_scan_schema(BasicScanMetadata): class xas_simple_scan_schema(BasicScanMetadata):