From abf432f2a920dcc54c498e81a92384eb06338420 Mon Sep 17 00:00:00 2001 From: appel_c Date: Fri, 20 Jun 2025 09:19:51 +0200 Subject: [PATCH] refactor: fix formatting --- debye_bec/devices/mo1_bragg/mo1_bragg.py | 17 ++-- .../devices/mo1_bragg/mo1_bragg_devices.py | 3 +- debye_bec/devices/nidaq/nidaq.py | 9 +- debye_bec/file_writer/__init__.py | 2 +- .../file_writer/debye_nexus_structure.py | 84 ++++++++++++------- .../metadata_schema_registry.py | 4 +- .../metadata_schema_xas_simple_scan.py | 2 + 7 files changed, 75 insertions(+), 46 deletions(-) diff --git a/debye_bec/devices/mo1_bragg/mo1_bragg.py b/debye_bec/devices/mo1_bragg/mo1_bragg.py index 754dee4..dca0b14 100644 --- a/debye_bec/devices/mo1_bragg/mo1_bragg.py +++ b/debye_bec/devices/mo1_bragg/mo1_bragg.py @@ -16,14 +16,12 @@ from bec_lib.logger import bec_logger from ophyd import Component as Cpt from ophyd import DeviceStatus, Signal, StatusBase from ophyd.status import SubscriptionStatus, WaitTimeoutError -from ophyd_devices import ProgressSignal +from ophyd_devices import CompareStatus, ProgressSignal, TransitionStatus from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase from ophyd_devices.utils.errors import DeviceStopError from pydantic import BaseModel, Field from typeguard import typechecked -from ophyd_devices import TransitionStatus, CompareStatus - from debye_bec.devices.mo1_bragg.mo1_bragg_devices import Mo1BraggPositioner # pylint: disable=unused-import @@ -219,7 +217,10 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner): if self.stopped is True: logger.warning(f"Resetting stopped in unstage for device {self.name}.") self._stopped = False - if self.scan_control.scan_msg.get() in [ScanControlLoadMessage.STARTED, ScanControlLoadMessage.SUCCESS]: + if self.scan_control.scan_msg.get() in [ + ScanControlLoadMessage.STARTED, + ScanControlLoadMessage.SUCCESS, + ]: status = CompareStatus(self.scan_control.scan_msg, ScanControlLoadMessage.PENDING) self.cancel_on_stop(status) try: @@ -227,8 +228,8 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner): return None except WaitTimeoutError: logger.warning( - f"Timeout in on_unstage of {self.name} after {self.timeout_for_pvwait}s, current scan_control_message : {self.scan_control.scan_msg.get()}" - ) + f"Timeout in on_unstage of {self.name} after {self.timeout_for_pvwait}s, current scan_control_message : {self.scan_control.scan_msg.get()}" + ) else: status = CompareStatus(self.scan_control.scan_msg, ScanControlLoadMessage.PENDING) self.cancel_on_stop(status) @@ -258,11 +259,11 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner): else self.scan_control.scan_start_timer.put ) status = TransitionStatus( - self.scan_control.scan_status, + self.scan_control.scan_status, transitions=[ScanControlScanStatus.READY, ScanControlScanStatus.RUNNING], strict=True, raise_states=[ScanControlScanStatus.PARAMETER_WRONG], - ) + ) self.cancel_on_stop(status) start_func(1) return status diff --git a/debye_bec/devices/mo1_bragg/mo1_bragg_devices.py b/debye_bec/devices/mo1_bragg/mo1_bragg_devices.py index 7e223e3..0977c17 100644 --- a/debye_bec/devices/mo1_bragg/mo1_bragg_devices.py +++ b/debye_bec/devices/mo1_bragg/mo1_bragg_devices.py @@ -94,7 +94,8 @@ class Mo1BraggCrystal(Device): current_xtal_string = Cpt( EpicsSignalRO, suffix="current_xtal_ENUM_RBV", kind="normal", auto_monitor=True, string=True - ) + ) + class Mo1BraggScanSettings(Device): """Mo1 Bragg PVs to set the scan setttings""" diff --git a/debye_bec/devices/nidaq/nidaq.py b/debye_bec/devices/nidaq/nidaq.py index a6e7188..e97a739 100644 --- a/debye_bec/devices/nidaq/nidaq.py +++ b/debye_bec/devices/nidaq/nidaq.py @@ -7,12 +7,10 @@ from bec_lib.logger import bec_logger from ophyd import Component as Cpt from ophyd import Device, DeviceStatus, EpicsSignal, EpicsSignalRO, Kind, StatusBase from ophyd.status import SubscriptionStatus, WaitTimeoutError -from ophyd_devices import ProgressSignal +from ophyd_devices import CompareStatus, ProgressSignal, TransitionStatus from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase from ophyd_devices.sim.sim_signals import SetableSignal -from ophyd_devices import CompareStatus, TransitionStatus - from debye_bec.devices.nidaq.nidaq_enums import ( EncoderFactors, NIDAQCompression, @@ -502,13 +500,13 @@ class Nidaq(PSIDeviceBase, NidaqControl): Called after the device is connected and its signals are connected. Default values for signals should be set here. """ - status = TransitionStatus(self.heartbeat, transitions=[0,1], strict=False) + status = TransitionStatus(self.heartbeat, transitions=[0, 1], strict=False) self.cancel_on_stop(status) try: status.wait(timeout=self.timeout_wait_for_signal) # Raises if timeout is reached except WaitTimeoutError: logger.warning(f"Device {self.name} was not alive, trying to put power on") - status = TransitionStatus(self.heartbeat, transitions=[0,1], strict=False) + status = TransitionStatus(self.heartbeat, transitions=[0, 1], strict=False) self.cancel_on_stop(status) self.power.put(1) @@ -589,7 +587,6 @@ class Nidaq(PSIDeviceBase, NidaqControl): logger.info(f"Device {self.name} ready to be kicked off for nidaq_continuous_scan") return None - status = CompareStatus(self.state, NidaqState.KICKOFF) self.cancel_on_stop(status) status.wait(timeout=self._timeout_wait_for_pv) diff --git a/debye_bec/file_writer/__init__.py b/debye_bec/file_writer/__init__.py index 62f8ddd..30ec638 100644 --- a/debye_bec/file_writer/__init__.py +++ b/debye_bec/file_writer/__init__.py @@ -1 +1 @@ -from .debye_nexus_structure import DebyeNexusStructure \ No newline at end of file +from .debye_nexus_structure import DebyeNexusStructure diff --git a/debye_bec/file_writer/debye_nexus_structure.py b/debye_bec/file_writer/debye_nexus_structure.py index d1f1ce4..ebabcd7 100644 --- a/debye_bec/file_writer/debye_nexus_structure.py +++ b/debye_bec/file_writer/debye_nexus_structure.py @@ -1,10 +1,11 @@ from bec_server.file_writer.default_writer import DefaultFormat + class DebyeNexusStructure(DefaultFormat): - """ Nexus Structure for Debye""" + """Nexus Structure for Debye""" def format(self) -> None: - """ Specify the file format for the file writer.""" + """Specify the file format for the file writer.""" entry = self.storage.create_group(name="entry") entry.attrs["NX_class"] = "NXentry" @@ -17,81 +18,108 @@ class DebyeNexusStructure(DefaultFormat): # Logic if device exist if "mo1_bragg" in self.device_manager.devices: - + monochromator = instrument.create_group(name="monochromator") monochromator.attrs["NX_class"] = "NXmonochromator" crystal = monochromator.create_group(name="crystal") crystal.attrs["NX_class"] = "NXcrystal" # Create a dataset - chemical_formular = crystal.create_dataset(name="chemical_formular", data='Si') + chemical_formular = crystal.create_dataset(name="chemical_formular", data="Si") chemical_formular.attrs["NX_class"] = "NX_CHAR" # Create a softlink - d_spacing = crystal.create_soft_link(name="d_spacing", target="/entry/collection/devices/mo1_bragg/mo1_bragg_crystal_current_d_spacing/value") + d_spacing = crystal.create_soft_link( + name="d_spacing", + target="/entry/collection/devices/mo1_bragg/mo1_bragg_crystal_current_d_spacing/value", + ) d_spacing.attrs["NX_class"] = "NX_FLOAT" - offset = crystal.create_soft_link(name="offset", target="/entry/collection/devices/mo1_bragg/mo1_bragg_crystal_current_offset/value") + offset = crystal.create_soft_link( + name="offset", + target="/entry/collection/devices/mo1_bragg/mo1_bragg_crystal_current_offset/value", + ) offset.attrs["NX_class"] = "NX_FLOAT" - reflection = crystal.create_soft_link(name="reflection", target="/entry/collection/devices/mo1_bragg/mo1_bragg_crystal_current_xtal_string/value") + reflection = crystal.create_soft_link( + name="reflection", + target="/entry/collection/devices/mo1_bragg/mo1_bragg_crystal_current_xtal_string/value", + ) reflection.attrs["NX_class"] = "NX_CHAR" - ################## ## cm mirror specific information - ################### - + ################### + collimating_mirror = instrument.create_group(name="collimating_mirror") collimating_mirror.attrs["NX_class"] = "NXmirror" - cm_substrate_material = collimating_mirror.create_dataset(name="substrate_material", data='Si') + cm_substrate_material = collimating_mirror.create_dataset( + name="substrate_material", data="Si" + ) cm_substrate_material.attrs["NX_class"] = "NX_CHAR" - cm_bending_radius = collimating_mirror.create_soft_link(name="sagittal radius", target="/entry/collection/devices/cm_bnd_radius/cm_bnd_radius/value") + cm_bending_radius = collimating_mirror.create_soft_link( + name="sagittal radius", + target="/entry/collection/devices/cm_bnd_radius/cm_bnd_radius/value", + ) cm_bending_radius.attrs["NX_class"] = "NX_FLOAT" - cm_bending_radius.attrs["units"] = 'km' + cm_bending_radius.attrs["units"] = "km" - cm_incidence_angle = collimating_mirror.create_soft_link(name="incidence angle", target="/entry/collection/devices/cm_rotx/cm_rotx/value") + cm_incidence_angle = collimating_mirror.create_soft_link( + name="incidence angle", target="/entry/collection/devices/cm_rotx/cm_rotx/value" + ) cm_incidence_angle.attrs["NX_class"] = "NX_FLOAT" - cm_yaw_angle = collimating_mirror.create_soft_link(name="incident angle", target="/entry/collection/devices/cm_roty/cm_roty/value") - cm_yaw_angle.attrs["NX_class"] = "NX_FLOAT" + cm_yaw_angle = collimating_mirror.create_soft_link( + name="incident angle", target="/entry/collection/devices/cm_roty/cm_roty/value" + ) + cm_yaw_angle.attrs["NX_class"] = "NX_FLOAT" ################## ## fm mirror specific information - ################### - + ################### + focusing_mirror = instrument.create_group(name="focusing_mirror") focusing_mirror.attrs["NX_class"] = "NXmirror" - fm_substrate_material = focusing_mirror.create_dataset(name="substrate_material", data='Si') + fm_substrate_material = focusing_mirror.create_dataset(name="substrate_material", data="Si") fm_substrate_material.attrs["NX_class"] = "NX_CHAR" - fm_bending_radius = focusing_mirror.create_soft_link(name="sagittal radius", target="/entry/collection/devices/fm_bnd_radius/fm_bnd_radius/value") + fm_bending_radius = focusing_mirror.create_soft_link( + name="sagittal radius", + target="/entry/collection/devices/fm_bnd_radius/fm_bnd_radius/value", + ) fm_bending_radius.attrs["NX_class"] = "NX_FLOAT" - fm_incidence_angle = focusing_mirror.create_soft_link(name="incidence angle", target="/entry/collection/devices/fm_incidence_angle/fm_incidence_angle/value") + fm_incidence_angle = focusing_mirror.create_soft_link( + name="incidence angle", + target="/entry/collection/devices/fm_incidence_angle/fm_incidence_angle/value", + ) fm_incidence_angle.attrs["NX_class"] = "NX_FLOAT" - fm_yaw_angle = focusing_mirror.create_soft_link(name="yaw angle", target="/entry/collection/devices/fm_roty/fm_roty/value") + fm_yaw_angle = focusing_mirror.create_soft_link( + name="yaw angle", target="/entry/collection/devices/fm_roty/fm_roty/value" + ) fm_yaw_angle.attrs["NX_class"] = "NX_FLOAT" - fm_roll_angle = focusing_mirror.create_soft_link(name="roll angle", target="/entry/collection/devices/fm_rotz/fm_rotz/value") + fm_roll_angle = focusing_mirror.create_soft_link( + name="roll angle", target="/entry/collection/devices/fm_rotz/fm_rotz/value" + ) fm_roll_angle.attrs["NX_class"] = "NX_FLOAT" ################## ## source specific information - ################### + ################### source = instrument.create_group(name="source") source.attrs["NX_class"] = "NXsource" - beamline_name = source.create_dataset(name="beamline_name", data='Debye') + beamline_name = source.create_dataset(name="beamline_name", data="Debye") beamline_name.attrs["NX_class"] = "NX_CHAR" - facility_name = source.create_dataset(name="facility_name", data='Swiss Light Source') + facility_name = source.create_dataset(name="facility_name", data="Swiss Light Source") facility_name.attrs["NX_class"] = "NX_CHAR" - probe = source.create_dataset(name="probe", data='X-ray') - probe.attrs["NX_class"] = "NX_CHAR" \ No newline at end of file + probe = source.create_dataset(name="probe", data="X-ray") + probe.attrs["NX_class"] = "NX_CHAR" diff --git a/debye_bec/scans/metadata_schema/metadata_schema_registry.py b/debye_bec/scans/metadata_schema/metadata_schema_registry.py index dd72375..2091a05 100644 --- a/debye_bec/scans/metadata_schema/metadata_schema_registry.py +++ b/debye_bec/scans/metadata_schema/metadata_schema_registry.py @@ -1,6 +1,6 @@ -#from .metadata_schema_xas_simple_scan import xas_simple_scan_schema +# from .metadata_schema_xas_simple_scan import xas_simple_scan_schema -METADATA_SCHEMA_REGISTRY = {#"xas_simple_scan": xas_simple_scan_schema +METADATA_SCHEMA_REGISTRY = { # "xas_simple_scan": xas_simple_scan_schema # Add models which should be used to validate scan metadata here. # Make a model according to the template, and import it as above # Then associate it with a scan like so: diff --git a/debye_bec/scans/metadata_schema/metadata_schema_xas_simple_scan.py b/debye_bec/scans/metadata_schema/metadata_schema_xas_simple_scan.py index b69ff33..5277c04 100644 --- a/debye_bec/scans/metadata_schema/metadata_schema_xas_simple_scan.py +++ b/debye_bec/scans/metadata_schema/metadata_schema_xas_simple_scan.py @@ -1,4 +1,6 @@ from bec_lib.metadata_schema import BasicScanMetadata + + # # class xas_simple_scan_schema(BasicScanMetadata):