refactor: fix formatting

This commit is contained in:
2025-06-20 09:19:51 +02:00
parent b3672cf5f5
commit abf432f2a9
7 changed files with 75 additions and 46 deletions

View File

@@ -16,14 +16,12 @@ from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd import DeviceStatus, Signal, StatusBase
from ophyd.status import SubscriptionStatus, WaitTimeoutError
from ophyd_devices import ProgressSignal
from ophyd_devices import CompareStatus, ProgressSignal, TransitionStatus
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from ophyd_devices.utils.errors import DeviceStopError
from pydantic import BaseModel, Field
from typeguard import typechecked
from ophyd_devices import TransitionStatus, CompareStatus
from debye_bec.devices.mo1_bragg.mo1_bragg_devices import Mo1BraggPositioner
# pylint: disable=unused-import
@@ -219,7 +217,10 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
if self.stopped is True:
logger.warning(f"Resetting stopped in unstage for device {self.name}.")
self._stopped = False
if self.scan_control.scan_msg.get() in [ScanControlLoadMessage.STARTED, ScanControlLoadMessage.SUCCESS]:
if self.scan_control.scan_msg.get() in [
ScanControlLoadMessage.STARTED,
ScanControlLoadMessage.SUCCESS,
]:
status = CompareStatus(self.scan_control.scan_msg, ScanControlLoadMessage.PENDING)
self.cancel_on_stop(status)
try:
@@ -227,8 +228,8 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
return None
except WaitTimeoutError:
logger.warning(
f"Timeout in on_unstage of {self.name} after {self.timeout_for_pvwait}s, current scan_control_message : {self.scan_control.scan_msg.get()}"
)
f"Timeout in on_unstage of {self.name} after {self.timeout_for_pvwait}s, current scan_control_message : {self.scan_control.scan_msg.get()}"
)
else:
status = CompareStatus(self.scan_control.scan_msg, ScanControlLoadMessage.PENDING)
self.cancel_on_stop(status)
@@ -258,11 +259,11 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
else self.scan_control.scan_start_timer.put
)
status = TransitionStatus(
self.scan_control.scan_status,
self.scan_control.scan_status,
transitions=[ScanControlScanStatus.READY, ScanControlScanStatus.RUNNING],
strict=True,
raise_states=[ScanControlScanStatus.PARAMETER_WRONG],
)
)
self.cancel_on_stop(status)
start_func(1)
return status

View File

@@ -94,7 +94,8 @@ class Mo1BraggCrystal(Device):
current_xtal_string = Cpt(
EpicsSignalRO, suffix="current_xtal_ENUM_RBV", kind="normal", auto_monitor=True, string=True
)
)
class Mo1BraggScanSettings(Device):
"""Mo1 Bragg PVs to set the scan setttings"""

View File

@@ -7,12 +7,10 @@ from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd import Device, DeviceStatus, EpicsSignal, EpicsSignalRO, Kind, StatusBase
from ophyd.status import SubscriptionStatus, WaitTimeoutError
from ophyd_devices import ProgressSignal
from ophyd_devices import CompareStatus, ProgressSignal, TransitionStatus
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from ophyd_devices.sim.sim_signals import SetableSignal
from ophyd_devices import CompareStatus, TransitionStatus
from debye_bec.devices.nidaq.nidaq_enums import (
EncoderFactors,
NIDAQCompression,
@@ -502,13 +500,13 @@ class Nidaq(PSIDeviceBase, NidaqControl):
Called after the device is connected and its signals are connected.
Default values for signals should be set here.
"""
status = TransitionStatus(self.heartbeat, transitions=[0,1], strict=False)
status = TransitionStatus(self.heartbeat, transitions=[0, 1], strict=False)
self.cancel_on_stop(status)
try:
status.wait(timeout=self.timeout_wait_for_signal) # Raises if timeout is reached
except WaitTimeoutError:
logger.warning(f"Device {self.name} was not alive, trying to put power on")
status = TransitionStatus(self.heartbeat, transitions=[0,1], strict=False)
status = TransitionStatus(self.heartbeat, transitions=[0, 1], strict=False)
self.cancel_on_stop(status)
self.power.put(1)
@@ -589,7 +587,6 @@ class Nidaq(PSIDeviceBase, NidaqControl):
logger.info(f"Device {self.name} ready to be kicked off for nidaq_continuous_scan")
return None
status = CompareStatus(self.state, NidaqState.KICKOFF)
self.cancel_on_stop(status)
status.wait(timeout=self._timeout_wait_for_pv)

View File

@@ -1 +1 @@
from .debye_nexus_structure import DebyeNexusStructure
from .debye_nexus_structure import DebyeNexusStructure

View File

@@ -1,10 +1,11 @@
from bec_server.file_writer.default_writer import DefaultFormat
class DebyeNexusStructure(DefaultFormat):
""" Nexus Structure for Debye"""
"""Nexus Structure for Debye"""
def format(self) -> None:
""" Specify the file format for the file writer."""
"""Specify the file format for the file writer."""
entry = self.storage.create_group(name="entry")
entry.attrs["NX_class"] = "NXentry"
@@ -17,81 +18,108 @@ class DebyeNexusStructure(DefaultFormat):
# Logic if device exist
if "mo1_bragg" in self.device_manager.devices:
monochromator = instrument.create_group(name="monochromator")
monochromator.attrs["NX_class"] = "NXmonochromator"
crystal = monochromator.create_group(name="crystal")
crystal.attrs["NX_class"] = "NXcrystal"
# Create a dataset
chemical_formular = crystal.create_dataset(name="chemical_formular", data='Si')
chemical_formular = crystal.create_dataset(name="chemical_formular", data="Si")
chemical_formular.attrs["NX_class"] = "NX_CHAR"
# Create a softlink
d_spacing = crystal.create_soft_link(name="d_spacing", target="/entry/collection/devices/mo1_bragg/mo1_bragg_crystal_current_d_spacing/value")
d_spacing = crystal.create_soft_link(
name="d_spacing",
target="/entry/collection/devices/mo1_bragg/mo1_bragg_crystal_current_d_spacing/value",
)
d_spacing.attrs["NX_class"] = "NX_FLOAT"
offset = crystal.create_soft_link(name="offset", target="/entry/collection/devices/mo1_bragg/mo1_bragg_crystal_current_offset/value")
offset = crystal.create_soft_link(
name="offset",
target="/entry/collection/devices/mo1_bragg/mo1_bragg_crystal_current_offset/value",
)
offset.attrs["NX_class"] = "NX_FLOAT"
reflection = crystal.create_soft_link(name="reflection", target="/entry/collection/devices/mo1_bragg/mo1_bragg_crystal_current_xtal_string/value")
reflection = crystal.create_soft_link(
name="reflection",
target="/entry/collection/devices/mo1_bragg/mo1_bragg_crystal_current_xtal_string/value",
)
reflection.attrs["NX_class"] = "NX_CHAR"
##################
## cm mirror specific information
###################
###################
collimating_mirror = instrument.create_group(name="collimating_mirror")
collimating_mirror.attrs["NX_class"] = "NXmirror"
cm_substrate_material = collimating_mirror.create_dataset(name="substrate_material", data='Si')
cm_substrate_material = collimating_mirror.create_dataset(
name="substrate_material", data="Si"
)
cm_substrate_material.attrs["NX_class"] = "NX_CHAR"
cm_bending_radius = collimating_mirror.create_soft_link(name="sagittal radius", target="/entry/collection/devices/cm_bnd_radius/cm_bnd_radius/value")
cm_bending_radius = collimating_mirror.create_soft_link(
name="sagittal radius",
target="/entry/collection/devices/cm_bnd_radius/cm_bnd_radius/value",
)
cm_bending_radius.attrs["NX_class"] = "NX_FLOAT"
cm_bending_radius.attrs["units"] = 'km'
cm_bending_radius.attrs["units"] = "km"
cm_incidence_angle = collimating_mirror.create_soft_link(name="incidence angle", target="/entry/collection/devices/cm_rotx/cm_rotx/value")
cm_incidence_angle = collimating_mirror.create_soft_link(
name="incidence angle", target="/entry/collection/devices/cm_rotx/cm_rotx/value"
)
cm_incidence_angle.attrs["NX_class"] = "NX_FLOAT"
cm_yaw_angle = collimating_mirror.create_soft_link(name="incident angle", target="/entry/collection/devices/cm_roty/cm_roty/value")
cm_yaw_angle.attrs["NX_class"] = "NX_FLOAT"
cm_yaw_angle = collimating_mirror.create_soft_link(
name="incident angle", target="/entry/collection/devices/cm_roty/cm_roty/value"
)
cm_yaw_angle.attrs["NX_class"] = "NX_FLOAT"
##################
## fm mirror specific information
###################
###################
focusing_mirror = instrument.create_group(name="focusing_mirror")
focusing_mirror.attrs["NX_class"] = "NXmirror"
fm_substrate_material = focusing_mirror.create_dataset(name="substrate_material", data='Si')
fm_substrate_material = focusing_mirror.create_dataset(name="substrate_material", data="Si")
fm_substrate_material.attrs["NX_class"] = "NX_CHAR"
fm_bending_radius = focusing_mirror.create_soft_link(name="sagittal radius", target="/entry/collection/devices/fm_bnd_radius/fm_bnd_radius/value")
fm_bending_radius = focusing_mirror.create_soft_link(
name="sagittal radius",
target="/entry/collection/devices/fm_bnd_radius/fm_bnd_radius/value",
)
fm_bending_radius.attrs["NX_class"] = "NX_FLOAT"
fm_incidence_angle = focusing_mirror.create_soft_link(name="incidence angle", target="/entry/collection/devices/fm_incidence_angle/fm_incidence_angle/value")
fm_incidence_angle = focusing_mirror.create_soft_link(
name="incidence angle",
target="/entry/collection/devices/fm_incidence_angle/fm_incidence_angle/value",
)
fm_incidence_angle.attrs["NX_class"] = "NX_FLOAT"
fm_yaw_angle = focusing_mirror.create_soft_link(name="yaw angle", target="/entry/collection/devices/fm_roty/fm_roty/value")
fm_yaw_angle = focusing_mirror.create_soft_link(
name="yaw angle", target="/entry/collection/devices/fm_roty/fm_roty/value"
)
fm_yaw_angle.attrs["NX_class"] = "NX_FLOAT"
fm_roll_angle = focusing_mirror.create_soft_link(name="roll angle", target="/entry/collection/devices/fm_rotz/fm_rotz/value")
fm_roll_angle = focusing_mirror.create_soft_link(
name="roll angle", target="/entry/collection/devices/fm_rotz/fm_rotz/value"
)
fm_roll_angle.attrs["NX_class"] = "NX_FLOAT"
##################
## source specific information
###################
###################
source = instrument.create_group(name="source")
source.attrs["NX_class"] = "NXsource"
beamline_name = source.create_dataset(name="beamline_name", data='Debye')
beamline_name = source.create_dataset(name="beamline_name", data="Debye")
beamline_name.attrs["NX_class"] = "NX_CHAR"
facility_name = source.create_dataset(name="facility_name", data='Swiss Light Source')
facility_name = source.create_dataset(name="facility_name", data="Swiss Light Source")
facility_name.attrs["NX_class"] = "NX_CHAR"
probe = source.create_dataset(name="probe", data='X-ray')
probe.attrs["NX_class"] = "NX_CHAR"
probe = source.create_dataset(name="probe", data="X-ray")
probe.attrs["NX_class"] = "NX_CHAR"

View File

@@ -1,6 +1,6 @@
#from .metadata_schema_xas_simple_scan import xas_simple_scan_schema
# from .metadata_schema_xas_simple_scan import xas_simple_scan_schema
METADATA_SCHEMA_REGISTRY = {#"xas_simple_scan": xas_simple_scan_schema
METADATA_SCHEMA_REGISTRY = { # "xas_simple_scan": xas_simple_scan_schema
# Add models which should be used to validate scan metadata here.
# Make a model according to the template, and import it as above
# Then associate it with a scan like so:

View File

@@ -1,4 +1,6 @@
from bec_lib.metadata_schema import BasicScanMetadata
#
#
class xas_simple_scan_schema(BasicScanMetadata):