refactor: fix formatting
This commit is contained in:
@@ -16,14 +16,12 @@ from bec_lib.logger import bec_logger
|
||||
from ophyd import Component as Cpt
|
||||
from ophyd import DeviceStatus, Signal, StatusBase
|
||||
from ophyd.status import SubscriptionStatus, WaitTimeoutError
|
||||
from ophyd_devices import ProgressSignal
|
||||
from ophyd_devices import CompareStatus, ProgressSignal, TransitionStatus
|
||||
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
|
||||
from ophyd_devices.utils.errors import DeviceStopError
|
||||
from pydantic import BaseModel, Field
|
||||
from typeguard import typechecked
|
||||
|
||||
from ophyd_devices import TransitionStatus, CompareStatus
|
||||
|
||||
from debye_bec.devices.mo1_bragg.mo1_bragg_devices import Mo1BraggPositioner
|
||||
|
||||
# pylint: disable=unused-import
|
||||
@@ -219,7 +217,10 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
|
||||
if self.stopped is True:
|
||||
logger.warning(f"Resetting stopped in unstage for device {self.name}.")
|
||||
self._stopped = False
|
||||
if self.scan_control.scan_msg.get() in [ScanControlLoadMessage.STARTED, ScanControlLoadMessage.SUCCESS]:
|
||||
if self.scan_control.scan_msg.get() in [
|
||||
ScanControlLoadMessage.STARTED,
|
||||
ScanControlLoadMessage.SUCCESS,
|
||||
]:
|
||||
status = CompareStatus(self.scan_control.scan_msg, ScanControlLoadMessage.PENDING)
|
||||
self.cancel_on_stop(status)
|
||||
try:
|
||||
@@ -227,8 +228,8 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
|
||||
return None
|
||||
except WaitTimeoutError:
|
||||
logger.warning(
|
||||
f"Timeout in on_unstage of {self.name} after {self.timeout_for_pvwait}s, current scan_control_message : {self.scan_control.scan_msg.get()}"
|
||||
)
|
||||
f"Timeout in on_unstage of {self.name} after {self.timeout_for_pvwait}s, current scan_control_message : {self.scan_control.scan_msg.get()}"
|
||||
)
|
||||
else:
|
||||
status = CompareStatus(self.scan_control.scan_msg, ScanControlLoadMessage.PENDING)
|
||||
self.cancel_on_stop(status)
|
||||
@@ -258,11 +259,11 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
|
||||
else self.scan_control.scan_start_timer.put
|
||||
)
|
||||
status = TransitionStatus(
|
||||
self.scan_control.scan_status,
|
||||
self.scan_control.scan_status,
|
||||
transitions=[ScanControlScanStatus.READY, ScanControlScanStatus.RUNNING],
|
||||
strict=True,
|
||||
raise_states=[ScanControlScanStatus.PARAMETER_WRONG],
|
||||
)
|
||||
)
|
||||
self.cancel_on_stop(status)
|
||||
start_func(1)
|
||||
return status
|
||||
|
||||
@@ -94,7 +94,8 @@ class Mo1BraggCrystal(Device):
|
||||
|
||||
current_xtal_string = Cpt(
|
||||
EpicsSignalRO, suffix="current_xtal_ENUM_RBV", kind="normal", auto_monitor=True, string=True
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
class Mo1BraggScanSettings(Device):
|
||||
"""Mo1 Bragg PVs to set the scan setttings"""
|
||||
|
||||
@@ -7,12 +7,10 @@ from bec_lib.logger import bec_logger
|
||||
from ophyd import Component as Cpt
|
||||
from ophyd import Device, DeviceStatus, EpicsSignal, EpicsSignalRO, Kind, StatusBase
|
||||
from ophyd.status import SubscriptionStatus, WaitTimeoutError
|
||||
from ophyd_devices import ProgressSignal
|
||||
from ophyd_devices import CompareStatus, ProgressSignal, TransitionStatus
|
||||
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
|
||||
from ophyd_devices.sim.sim_signals import SetableSignal
|
||||
|
||||
from ophyd_devices import CompareStatus, TransitionStatus
|
||||
|
||||
from debye_bec.devices.nidaq.nidaq_enums import (
|
||||
EncoderFactors,
|
||||
NIDAQCompression,
|
||||
@@ -502,13 +500,13 @@ class Nidaq(PSIDeviceBase, NidaqControl):
|
||||
Called after the device is connected and its signals are connected.
|
||||
Default values for signals should be set here.
|
||||
"""
|
||||
status = TransitionStatus(self.heartbeat, transitions=[0,1], strict=False)
|
||||
status = TransitionStatus(self.heartbeat, transitions=[0, 1], strict=False)
|
||||
self.cancel_on_stop(status)
|
||||
try:
|
||||
status.wait(timeout=self.timeout_wait_for_signal) # Raises if timeout is reached
|
||||
except WaitTimeoutError:
|
||||
logger.warning(f"Device {self.name} was not alive, trying to put power on")
|
||||
status = TransitionStatus(self.heartbeat, transitions=[0,1], strict=False)
|
||||
status = TransitionStatus(self.heartbeat, transitions=[0, 1], strict=False)
|
||||
self.cancel_on_stop(status)
|
||||
self.power.put(1)
|
||||
|
||||
@@ -589,7 +587,6 @@ class Nidaq(PSIDeviceBase, NidaqControl):
|
||||
logger.info(f"Device {self.name} ready to be kicked off for nidaq_continuous_scan")
|
||||
return None
|
||||
|
||||
|
||||
status = CompareStatus(self.state, NidaqState.KICKOFF)
|
||||
self.cancel_on_stop(status)
|
||||
status.wait(timeout=self._timeout_wait_for_pv)
|
||||
|
||||
@@ -1 +1 @@
|
||||
from .debye_nexus_structure import DebyeNexusStructure
|
||||
from .debye_nexus_structure import DebyeNexusStructure
|
||||
|
||||
@@ -1,10 +1,11 @@
|
||||
from bec_server.file_writer.default_writer import DefaultFormat
|
||||
|
||||
|
||||
class DebyeNexusStructure(DefaultFormat):
|
||||
""" Nexus Structure for Debye"""
|
||||
"""Nexus Structure for Debye"""
|
||||
|
||||
def format(self) -> None:
|
||||
""" Specify the file format for the file writer."""
|
||||
"""Specify the file format for the file writer."""
|
||||
|
||||
entry = self.storage.create_group(name="entry")
|
||||
entry.attrs["NX_class"] = "NXentry"
|
||||
@@ -17,81 +18,108 @@ class DebyeNexusStructure(DefaultFormat):
|
||||
|
||||
# Logic if device exist
|
||||
if "mo1_bragg" in self.device_manager.devices:
|
||||
|
||||
|
||||
monochromator = instrument.create_group(name="monochromator")
|
||||
monochromator.attrs["NX_class"] = "NXmonochromator"
|
||||
crystal = monochromator.create_group(name="crystal")
|
||||
crystal.attrs["NX_class"] = "NXcrystal"
|
||||
|
||||
# Create a dataset
|
||||
chemical_formular = crystal.create_dataset(name="chemical_formular", data='Si')
|
||||
chemical_formular = crystal.create_dataset(name="chemical_formular", data="Si")
|
||||
chemical_formular.attrs["NX_class"] = "NX_CHAR"
|
||||
|
||||
# Create a softlink
|
||||
d_spacing = crystal.create_soft_link(name="d_spacing", target="/entry/collection/devices/mo1_bragg/mo1_bragg_crystal_current_d_spacing/value")
|
||||
d_spacing = crystal.create_soft_link(
|
||||
name="d_spacing",
|
||||
target="/entry/collection/devices/mo1_bragg/mo1_bragg_crystal_current_d_spacing/value",
|
||||
)
|
||||
d_spacing.attrs["NX_class"] = "NX_FLOAT"
|
||||
|
||||
offset = crystal.create_soft_link(name="offset", target="/entry/collection/devices/mo1_bragg/mo1_bragg_crystal_current_offset/value")
|
||||
offset = crystal.create_soft_link(
|
||||
name="offset",
|
||||
target="/entry/collection/devices/mo1_bragg/mo1_bragg_crystal_current_offset/value",
|
||||
)
|
||||
offset.attrs["NX_class"] = "NX_FLOAT"
|
||||
|
||||
reflection = crystal.create_soft_link(name="reflection", target="/entry/collection/devices/mo1_bragg/mo1_bragg_crystal_current_xtal_string/value")
|
||||
reflection = crystal.create_soft_link(
|
||||
name="reflection",
|
||||
target="/entry/collection/devices/mo1_bragg/mo1_bragg_crystal_current_xtal_string/value",
|
||||
)
|
||||
reflection.attrs["NX_class"] = "NX_CHAR"
|
||||
|
||||
|
||||
##################
|
||||
## cm mirror specific information
|
||||
###################
|
||||
|
||||
###################
|
||||
|
||||
collimating_mirror = instrument.create_group(name="collimating_mirror")
|
||||
collimating_mirror.attrs["NX_class"] = "NXmirror"
|
||||
|
||||
cm_substrate_material = collimating_mirror.create_dataset(name="substrate_material", data='Si')
|
||||
cm_substrate_material = collimating_mirror.create_dataset(
|
||||
name="substrate_material", data="Si"
|
||||
)
|
||||
cm_substrate_material.attrs["NX_class"] = "NX_CHAR"
|
||||
|
||||
cm_bending_radius = collimating_mirror.create_soft_link(name="sagittal radius", target="/entry/collection/devices/cm_bnd_radius/cm_bnd_radius/value")
|
||||
cm_bending_radius = collimating_mirror.create_soft_link(
|
||||
name="sagittal radius",
|
||||
target="/entry/collection/devices/cm_bnd_radius/cm_bnd_radius/value",
|
||||
)
|
||||
cm_bending_radius.attrs["NX_class"] = "NX_FLOAT"
|
||||
cm_bending_radius.attrs["units"] = 'km'
|
||||
cm_bending_radius.attrs["units"] = "km"
|
||||
|
||||
cm_incidence_angle = collimating_mirror.create_soft_link(name="incidence angle", target="/entry/collection/devices/cm_rotx/cm_rotx/value")
|
||||
cm_incidence_angle = collimating_mirror.create_soft_link(
|
||||
name="incidence angle", target="/entry/collection/devices/cm_rotx/cm_rotx/value"
|
||||
)
|
||||
cm_incidence_angle.attrs["NX_class"] = "NX_FLOAT"
|
||||
|
||||
cm_yaw_angle = collimating_mirror.create_soft_link(name="incident angle", target="/entry/collection/devices/cm_roty/cm_roty/value")
|
||||
cm_yaw_angle.attrs["NX_class"] = "NX_FLOAT"
|
||||
cm_yaw_angle = collimating_mirror.create_soft_link(
|
||||
name="incident angle", target="/entry/collection/devices/cm_roty/cm_roty/value"
|
||||
)
|
||||
cm_yaw_angle.attrs["NX_class"] = "NX_FLOAT"
|
||||
|
||||
##################
|
||||
## fm mirror specific information
|
||||
###################
|
||||
|
||||
###################
|
||||
|
||||
focusing_mirror = instrument.create_group(name="focusing_mirror")
|
||||
focusing_mirror.attrs["NX_class"] = "NXmirror"
|
||||
|
||||
fm_substrate_material = focusing_mirror.create_dataset(name="substrate_material", data='Si')
|
||||
fm_substrate_material = focusing_mirror.create_dataset(name="substrate_material", data="Si")
|
||||
fm_substrate_material.attrs["NX_class"] = "NX_CHAR"
|
||||
|
||||
fm_bending_radius = focusing_mirror.create_soft_link(name="sagittal radius", target="/entry/collection/devices/fm_bnd_radius/fm_bnd_radius/value")
|
||||
fm_bending_radius = focusing_mirror.create_soft_link(
|
||||
name="sagittal radius",
|
||||
target="/entry/collection/devices/fm_bnd_radius/fm_bnd_radius/value",
|
||||
)
|
||||
fm_bending_radius.attrs["NX_class"] = "NX_FLOAT"
|
||||
|
||||
fm_incidence_angle = focusing_mirror.create_soft_link(name="incidence angle", target="/entry/collection/devices/fm_incidence_angle/fm_incidence_angle/value")
|
||||
fm_incidence_angle = focusing_mirror.create_soft_link(
|
||||
name="incidence angle",
|
||||
target="/entry/collection/devices/fm_incidence_angle/fm_incidence_angle/value",
|
||||
)
|
||||
fm_incidence_angle.attrs["NX_class"] = "NX_FLOAT"
|
||||
|
||||
fm_yaw_angle = focusing_mirror.create_soft_link(name="yaw angle", target="/entry/collection/devices/fm_roty/fm_roty/value")
|
||||
fm_yaw_angle = focusing_mirror.create_soft_link(
|
||||
name="yaw angle", target="/entry/collection/devices/fm_roty/fm_roty/value"
|
||||
)
|
||||
fm_yaw_angle.attrs["NX_class"] = "NX_FLOAT"
|
||||
|
||||
fm_roll_angle = focusing_mirror.create_soft_link(name="roll angle", target="/entry/collection/devices/fm_rotz/fm_rotz/value")
|
||||
fm_roll_angle = focusing_mirror.create_soft_link(
|
||||
name="roll angle", target="/entry/collection/devices/fm_rotz/fm_rotz/value"
|
||||
)
|
||||
fm_roll_angle.attrs["NX_class"] = "NX_FLOAT"
|
||||
|
||||
##################
|
||||
## source specific information
|
||||
###################
|
||||
###################
|
||||
|
||||
source = instrument.create_group(name="source")
|
||||
source.attrs["NX_class"] = "NXsource"
|
||||
|
||||
beamline_name = source.create_dataset(name="beamline_name", data='Debye')
|
||||
beamline_name = source.create_dataset(name="beamline_name", data="Debye")
|
||||
beamline_name.attrs["NX_class"] = "NX_CHAR"
|
||||
|
||||
facility_name = source.create_dataset(name="facility_name", data='Swiss Light Source')
|
||||
facility_name = source.create_dataset(name="facility_name", data="Swiss Light Source")
|
||||
facility_name.attrs["NX_class"] = "NX_CHAR"
|
||||
|
||||
probe = source.create_dataset(name="probe", data='X-ray')
|
||||
probe.attrs["NX_class"] = "NX_CHAR"
|
||||
probe = source.create_dataset(name="probe", data="X-ray")
|
||||
probe.attrs["NX_class"] = "NX_CHAR"
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
#from .metadata_schema_xas_simple_scan import xas_simple_scan_schema
|
||||
# from .metadata_schema_xas_simple_scan import xas_simple_scan_schema
|
||||
|
||||
METADATA_SCHEMA_REGISTRY = {#"xas_simple_scan": xas_simple_scan_schema
|
||||
METADATA_SCHEMA_REGISTRY = { # "xas_simple_scan": xas_simple_scan_schema
|
||||
# Add models which should be used to validate scan metadata here.
|
||||
# Make a model according to the template, and import it as above
|
||||
# Then associate it with a scan like so:
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
from bec_lib.metadata_schema import BasicScanMetadata
|
||||
|
||||
|
||||
#
|
||||
#
|
||||
class xas_simple_scan_schema(BasicScanMetadata):
|
||||
|
||||
Reference in New Issue
Block a user