refactor: fix formatting
This commit is contained in:
@@ -16,14 +16,12 @@ from bec_lib.logger import bec_logger
|
|||||||
from ophyd import Component as Cpt
|
from ophyd import Component as Cpt
|
||||||
from ophyd import DeviceStatus, Signal, StatusBase
|
from ophyd import DeviceStatus, Signal, StatusBase
|
||||||
from ophyd.status import SubscriptionStatus, WaitTimeoutError
|
from ophyd.status import SubscriptionStatus, WaitTimeoutError
|
||||||
from ophyd_devices import ProgressSignal
|
from ophyd_devices import CompareStatus, ProgressSignal, TransitionStatus
|
||||||
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
|
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
|
||||||
from ophyd_devices.utils.errors import DeviceStopError
|
from ophyd_devices.utils.errors import DeviceStopError
|
||||||
from pydantic import BaseModel, Field
|
from pydantic import BaseModel, Field
|
||||||
from typeguard import typechecked
|
from typeguard import typechecked
|
||||||
|
|
||||||
from ophyd_devices import TransitionStatus, CompareStatus
|
|
||||||
|
|
||||||
from debye_bec.devices.mo1_bragg.mo1_bragg_devices import Mo1BraggPositioner
|
from debye_bec.devices.mo1_bragg.mo1_bragg_devices import Mo1BraggPositioner
|
||||||
|
|
||||||
# pylint: disable=unused-import
|
# pylint: disable=unused-import
|
||||||
@@ -219,7 +217,10 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
|
|||||||
if self.stopped is True:
|
if self.stopped is True:
|
||||||
logger.warning(f"Resetting stopped in unstage for device {self.name}.")
|
logger.warning(f"Resetting stopped in unstage for device {self.name}.")
|
||||||
self._stopped = False
|
self._stopped = False
|
||||||
if self.scan_control.scan_msg.get() in [ScanControlLoadMessage.STARTED, ScanControlLoadMessage.SUCCESS]:
|
if self.scan_control.scan_msg.get() in [
|
||||||
|
ScanControlLoadMessage.STARTED,
|
||||||
|
ScanControlLoadMessage.SUCCESS,
|
||||||
|
]:
|
||||||
status = CompareStatus(self.scan_control.scan_msg, ScanControlLoadMessage.PENDING)
|
status = CompareStatus(self.scan_control.scan_msg, ScanControlLoadMessage.PENDING)
|
||||||
self.cancel_on_stop(status)
|
self.cancel_on_stop(status)
|
||||||
try:
|
try:
|
||||||
@@ -227,8 +228,8 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
|
|||||||
return None
|
return None
|
||||||
except WaitTimeoutError:
|
except WaitTimeoutError:
|
||||||
logger.warning(
|
logger.warning(
|
||||||
f"Timeout in on_unstage of {self.name} after {self.timeout_for_pvwait}s, current scan_control_message : {self.scan_control.scan_msg.get()}"
|
f"Timeout in on_unstage of {self.name} after {self.timeout_for_pvwait}s, current scan_control_message : {self.scan_control.scan_msg.get()}"
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
status = CompareStatus(self.scan_control.scan_msg, ScanControlLoadMessage.PENDING)
|
status = CompareStatus(self.scan_control.scan_msg, ScanControlLoadMessage.PENDING)
|
||||||
self.cancel_on_stop(status)
|
self.cancel_on_stop(status)
|
||||||
@@ -258,11 +259,11 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
|
|||||||
else self.scan_control.scan_start_timer.put
|
else self.scan_control.scan_start_timer.put
|
||||||
)
|
)
|
||||||
status = TransitionStatus(
|
status = TransitionStatus(
|
||||||
self.scan_control.scan_status,
|
self.scan_control.scan_status,
|
||||||
transitions=[ScanControlScanStatus.READY, ScanControlScanStatus.RUNNING],
|
transitions=[ScanControlScanStatus.READY, ScanControlScanStatus.RUNNING],
|
||||||
strict=True,
|
strict=True,
|
||||||
raise_states=[ScanControlScanStatus.PARAMETER_WRONG],
|
raise_states=[ScanControlScanStatus.PARAMETER_WRONG],
|
||||||
)
|
)
|
||||||
self.cancel_on_stop(status)
|
self.cancel_on_stop(status)
|
||||||
start_func(1)
|
start_func(1)
|
||||||
return status
|
return status
|
||||||
|
|||||||
@@ -94,7 +94,8 @@ class Mo1BraggCrystal(Device):
|
|||||||
|
|
||||||
current_xtal_string = Cpt(
|
current_xtal_string = Cpt(
|
||||||
EpicsSignalRO, suffix="current_xtal_ENUM_RBV", kind="normal", auto_monitor=True, string=True
|
EpicsSignalRO, suffix="current_xtal_ENUM_RBV", kind="normal", auto_monitor=True, string=True
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
class Mo1BraggScanSettings(Device):
|
class Mo1BraggScanSettings(Device):
|
||||||
"""Mo1 Bragg PVs to set the scan setttings"""
|
"""Mo1 Bragg PVs to set the scan setttings"""
|
||||||
|
|||||||
@@ -7,12 +7,10 @@ from bec_lib.logger import bec_logger
|
|||||||
from ophyd import Component as Cpt
|
from ophyd import Component as Cpt
|
||||||
from ophyd import Device, DeviceStatus, EpicsSignal, EpicsSignalRO, Kind, StatusBase
|
from ophyd import Device, DeviceStatus, EpicsSignal, EpicsSignalRO, Kind, StatusBase
|
||||||
from ophyd.status import SubscriptionStatus, WaitTimeoutError
|
from ophyd.status import SubscriptionStatus, WaitTimeoutError
|
||||||
from ophyd_devices import ProgressSignal
|
from ophyd_devices import CompareStatus, ProgressSignal, TransitionStatus
|
||||||
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
|
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
|
||||||
from ophyd_devices.sim.sim_signals import SetableSignal
|
from ophyd_devices.sim.sim_signals import SetableSignal
|
||||||
|
|
||||||
from ophyd_devices import CompareStatus, TransitionStatus
|
|
||||||
|
|
||||||
from debye_bec.devices.nidaq.nidaq_enums import (
|
from debye_bec.devices.nidaq.nidaq_enums import (
|
||||||
EncoderFactors,
|
EncoderFactors,
|
||||||
NIDAQCompression,
|
NIDAQCompression,
|
||||||
@@ -502,13 +500,13 @@ class Nidaq(PSIDeviceBase, NidaqControl):
|
|||||||
Called after the device is connected and its signals are connected.
|
Called after the device is connected and its signals are connected.
|
||||||
Default values for signals should be set here.
|
Default values for signals should be set here.
|
||||||
"""
|
"""
|
||||||
status = TransitionStatus(self.heartbeat, transitions=[0,1], strict=False)
|
status = TransitionStatus(self.heartbeat, transitions=[0, 1], strict=False)
|
||||||
self.cancel_on_stop(status)
|
self.cancel_on_stop(status)
|
||||||
try:
|
try:
|
||||||
status.wait(timeout=self.timeout_wait_for_signal) # Raises if timeout is reached
|
status.wait(timeout=self.timeout_wait_for_signal) # Raises if timeout is reached
|
||||||
except WaitTimeoutError:
|
except WaitTimeoutError:
|
||||||
logger.warning(f"Device {self.name} was not alive, trying to put power on")
|
logger.warning(f"Device {self.name} was not alive, trying to put power on")
|
||||||
status = TransitionStatus(self.heartbeat, transitions=[0,1], strict=False)
|
status = TransitionStatus(self.heartbeat, transitions=[0, 1], strict=False)
|
||||||
self.cancel_on_stop(status)
|
self.cancel_on_stop(status)
|
||||||
self.power.put(1)
|
self.power.put(1)
|
||||||
|
|
||||||
@@ -589,7 +587,6 @@ class Nidaq(PSIDeviceBase, NidaqControl):
|
|||||||
logger.info(f"Device {self.name} ready to be kicked off for nidaq_continuous_scan")
|
logger.info(f"Device {self.name} ready to be kicked off for nidaq_continuous_scan")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
status = CompareStatus(self.state, NidaqState.KICKOFF)
|
status = CompareStatus(self.state, NidaqState.KICKOFF)
|
||||||
self.cancel_on_stop(status)
|
self.cancel_on_stop(status)
|
||||||
status.wait(timeout=self._timeout_wait_for_pv)
|
status.wait(timeout=self._timeout_wait_for_pv)
|
||||||
|
|||||||
@@ -1 +1 @@
|
|||||||
from .debye_nexus_structure import DebyeNexusStructure
|
from .debye_nexus_structure import DebyeNexusStructure
|
||||||
|
|||||||
@@ -1,10 +1,11 @@
|
|||||||
from bec_server.file_writer.default_writer import DefaultFormat
|
from bec_server.file_writer.default_writer import DefaultFormat
|
||||||
|
|
||||||
|
|
||||||
class DebyeNexusStructure(DefaultFormat):
|
class DebyeNexusStructure(DefaultFormat):
|
||||||
""" Nexus Structure for Debye"""
|
"""Nexus Structure for Debye"""
|
||||||
|
|
||||||
def format(self) -> None:
|
def format(self) -> None:
|
||||||
""" Specify the file format for the file writer."""
|
"""Specify the file format for the file writer."""
|
||||||
|
|
||||||
entry = self.storage.create_group(name="entry")
|
entry = self.storage.create_group(name="entry")
|
||||||
entry.attrs["NX_class"] = "NXentry"
|
entry.attrs["NX_class"] = "NXentry"
|
||||||
@@ -17,81 +18,108 @@ class DebyeNexusStructure(DefaultFormat):
|
|||||||
|
|
||||||
# Logic if device exist
|
# Logic if device exist
|
||||||
if "mo1_bragg" in self.device_manager.devices:
|
if "mo1_bragg" in self.device_manager.devices:
|
||||||
|
|
||||||
monochromator = instrument.create_group(name="monochromator")
|
monochromator = instrument.create_group(name="monochromator")
|
||||||
monochromator.attrs["NX_class"] = "NXmonochromator"
|
monochromator.attrs["NX_class"] = "NXmonochromator"
|
||||||
crystal = monochromator.create_group(name="crystal")
|
crystal = monochromator.create_group(name="crystal")
|
||||||
crystal.attrs["NX_class"] = "NXcrystal"
|
crystal.attrs["NX_class"] = "NXcrystal"
|
||||||
|
|
||||||
# Create a dataset
|
# Create a dataset
|
||||||
chemical_formular = crystal.create_dataset(name="chemical_formular", data='Si')
|
chemical_formular = crystal.create_dataset(name="chemical_formular", data="Si")
|
||||||
chemical_formular.attrs["NX_class"] = "NX_CHAR"
|
chemical_formular.attrs["NX_class"] = "NX_CHAR"
|
||||||
|
|
||||||
# Create a softlink
|
# Create a softlink
|
||||||
d_spacing = crystal.create_soft_link(name="d_spacing", target="/entry/collection/devices/mo1_bragg/mo1_bragg_crystal_current_d_spacing/value")
|
d_spacing = crystal.create_soft_link(
|
||||||
|
name="d_spacing",
|
||||||
|
target="/entry/collection/devices/mo1_bragg/mo1_bragg_crystal_current_d_spacing/value",
|
||||||
|
)
|
||||||
d_spacing.attrs["NX_class"] = "NX_FLOAT"
|
d_spacing.attrs["NX_class"] = "NX_FLOAT"
|
||||||
|
|
||||||
offset = crystal.create_soft_link(name="offset", target="/entry/collection/devices/mo1_bragg/mo1_bragg_crystal_current_offset/value")
|
offset = crystal.create_soft_link(
|
||||||
|
name="offset",
|
||||||
|
target="/entry/collection/devices/mo1_bragg/mo1_bragg_crystal_current_offset/value",
|
||||||
|
)
|
||||||
offset.attrs["NX_class"] = "NX_FLOAT"
|
offset.attrs["NX_class"] = "NX_FLOAT"
|
||||||
|
|
||||||
reflection = crystal.create_soft_link(name="reflection", target="/entry/collection/devices/mo1_bragg/mo1_bragg_crystal_current_xtal_string/value")
|
reflection = crystal.create_soft_link(
|
||||||
|
name="reflection",
|
||||||
|
target="/entry/collection/devices/mo1_bragg/mo1_bragg_crystal_current_xtal_string/value",
|
||||||
|
)
|
||||||
reflection.attrs["NX_class"] = "NX_CHAR"
|
reflection.attrs["NX_class"] = "NX_CHAR"
|
||||||
|
|
||||||
|
|
||||||
##################
|
##################
|
||||||
## cm mirror specific information
|
## cm mirror specific information
|
||||||
###################
|
###################
|
||||||
|
|
||||||
collimating_mirror = instrument.create_group(name="collimating_mirror")
|
collimating_mirror = instrument.create_group(name="collimating_mirror")
|
||||||
collimating_mirror.attrs["NX_class"] = "NXmirror"
|
collimating_mirror.attrs["NX_class"] = "NXmirror"
|
||||||
|
|
||||||
cm_substrate_material = collimating_mirror.create_dataset(name="substrate_material", data='Si')
|
cm_substrate_material = collimating_mirror.create_dataset(
|
||||||
|
name="substrate_material", data="Si"
|
||||||
|
)
|
||||||
cm_substrate_material.attrs["NX_class"] = "NX_CHAR"
|
cm_substrate_material.attrs["NX_class"] = "NX_CHAR"
|
||||||
|
|
||||||
cm_bending_radius = collimating_mirror.create_soft_link(name="sagittal radius", target="/entry/collection/devices/cm_bnd_radius/cm_bnd_radius/value")
|
cm_bending_radius = collimating_mirror.create_soft_link(
|
||||||
|
name="sagittal radius",
|
||||||
|
target="/entry/collection/devices/cm_bnd_radius/cm_bnd_radius/value",
|
||||||
|
)
|
||||||
cm_bending_radius.attrs["NX_class"] = "NX_FLOAT"
|
cm_bending_radius.attrs["NX_class"] = "NX_FLOAT"
|
||||||
cm_bending_radius.attrs["units"] = 'km'
|
cm_bending_radius.attrs["units"] = "km"
|
||||||
|
|
||||||
cm_incidence_angle = collimating_mirror.create_soft_link(name="incidence angle", target="/entry/collection/devices/cm_rotx/cm_rotx/value")
|
cm_incidence_angle = collimating_mirror.create_soft_link(
|
||||||
|
name="incidence angle", target="/entry/collection/devices/cm_rotx/cm_rotx/value"
|
||||||
|
)
|
||||||
cm_incidence_angle.attrs["NX_class"] = "NX_FLOAT"
|
cm_incidence_angle.attrs["NX_class"] = "NX_FLOAT"
|
||||||
|
|
||||||
cm_yaw_angle = collimating_mirror.create_soft_link(name="incident angle", target="/entry/collection/devices/cm_roty/cm_roty/value")
|
cm_yaw_angle = collimating_mirror.create_soft_link(
|
||||||
cm_yaw_angle.attrs["NX_class"] = "NX_FLOAT"
|
name="incident angle", target="/entry/collection/devices/cm_roty/cm_roty/value"
|
||||||
|
)
|
||||||
|
cm_yaw_angle.attrs["NX_class"] = "NX_FLOAT"
|
||||||
|
|
||||||
##################
|
##################
|
||||||
## fm mirror specific information
|
## fm mirror specific information
|
||||||
###################
|
###################
|
||||||
|
|
||||||
focusing_mirror = instrument.create_group(name="focusing_mirror")
|
focusing_mirror = instrument.create_group(name="focusing_mirror")
|
||||||
focusing_mirror.attrs["NX_class"] = "NXmirror"
|
focusing_mirror.attrs["NX_class"] = "NXmirror"
|
||||||
|
|
||||||
fm_substrate_material = focusing_mirror.create_dataset(name="substrate_material", data='Si')
|
fm_substrate_material = focusing_mirror.create_dataset(name="substrate_material", data="Si")
|
||||||
fm_substrate_material.attrs["NX_class"] = "NX_CHAR"
|
fm_substrate_material.attrs["NX_class"] = "NX_CHAR"
|
||||||
|
|
||||||
fm_bending_radius = focusing_mirror.create_soft_link(name="sagittal radius", target="/entry/collection/devices/fm_bnd_radius/fm_bnd_radius/value")
|
fm_bending_radius = focusing_mirror.create_soft_link(
|
||||||
|
name="sagittal radius",
|
||||||
|
target="/entry/collection/devices/fm_bnd_radius/fm_bnd_radius/value",
|
||||||
|
)
|
||||||
fm_bending_radius.attrs["NX_class"] = "NX_FLOAT"
|
fm_bending_radius.attrs["NX_class"] = "NX_FLOAT"
|
||||||
|
|
||||||
fm_incidence_angle = focusing_mirror.create_soft_link(name="incidence angle", target="/entry/collection/devices/fm_incidence_angle/fm_incidence_angle/value")
|
fm_incidence_angle = focusing_mirror.create_soft_link(
|
||||||
|
name="incidence angle",
|
||||||
|
target="/entry/collection/devices/fm_incidence_angle/fm_incidence_angle/value",
|
||||||
|
)
|
||||||
fm_incidence_angle.attrs["NX_class"] = "NX_FLOAT"
|
fm_incidence_angle.attrs["NX_class"] = "NX_FLOAT"
|
||||||
|
|
||||||
fm_yaw_angle = focusing_mirror.create_soft_link(name="yaw angle", target="/entry/collection/devices/fm_roty/fm_roty/value")
|
fm_yaw_angle = focusing_mirror.create_soft_link(
|
||||||
|
name="yaw angle", target="/entry/collection/devices/fm_roty/fm_roty/value"
|
||||||
|
)
|
||||||
fm_yaw_angle.attrs["NX_class"] = "NX_FLOAT"
|
fm_yaw_angle.attrs["NX_class"] = "NX_FLOAT"
|
||||||
|
|
||||||
fm_roll_angle = focusing_mirror.create_soft_link(name="roll angle", target="/entry/collection/devices/fm_rotz/fm_rotz/value")
|
fm_roll_angle = focusing_mirror.create_soft_link(
|
||||||
|
name="roll angle", target="/entry/collection/devices/fm_rotz/fm_rotz/value"
|
||||||
|
)
|
||||||
fm_roll_angle.attrs["NX_class"] = "NX_FLOAT"
|
fm_roll_angle.attrs["NX_class"] = "NX_FLOAT"
|
||||||
|
|
||||||
##################
|
##################
|
||||||
## source specific information
|
## source specific information
|
||||||
###################
|
###################
|
||||||
|
|
||||||
source = instrument.create_group(name="source")
|
source = instrument.create_group(name="source")
|
||||||
source.attrs["NX_class"] = "NXsource"
|
source.attrs["NX_class"] = "NXsource"
|
||||||
|
|
||||||
beamline_name = source.create_dataset(name="beamline_name", data='Debye')
|
beamline_name = source.create_dataset(name="beamline_name", data="Debye")
|
||||||
beamline_name.attrs["NX_class"] = "NX_CHAR"
|
beamline_name.attrs["NX_class"] = "NX_CHAR"
|
||||||
|
|
||||||
facility_name = source.create_dataset(name="facility_name", data='Swiss Light Source')
|
facility_name = source.create_dataset(name="facility_name", data="Swiss Light Source")
|
||||||
facility_name.attrs["NX_class"] = "NX_CHAR"
|
facility_name.attrs["NX_class"] = "NX_CHAR"
|
||||||
|
|
||||||
probe = source.create_dataset(name="probe", data='X-ray')
|
probe = source.create_dataset(name="probe", data="X-ray")
|
||||||
probe.attrs["NX_class"] = "NX_CHAR"
|
probe.attrs["NX_class"] = "NX_CHAR"
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
#from .metadata_schema_xas_simple_scan import xas_simple_scan_schema
|
# from .metadata_schema_xas_simple_scan import xas_simple_scan_schema
|
||||||
|
|
||||||
METADATA_SCHEMA_REGISTRY = {#"xas_simple_scan": xas_simple_scan_schema
|
METADATA_SCHEMA_REGISTRY = { # "xas_simple_scan": xas_simple_scan_schema
|
||||||
# Add models which should be used to validate scan metadata here.
|
# Add models which should be used to validate scan metadata here.
|
||||||
# Make a model according to the template, and import it as above
|
# Make a model according to the template, and import it as above
|
||||||
# Then associate it with a scan like so:
|
# Then associate it with a scan like so:
|
||||||
|
|||||||
@@ -1,4 +1,6 @@
|
|||||||
from bec_lib.metadata_schema import BasicScanMetadata
|
from bec_lib.metadata_schema import BasicScanMetadata
|
||||||
|
|
||||||
|
|
||||||
#
|
#
|
||||||
#
|
#
|
||||||
class xas_simple_scan_schema(BasicScanMetadata):
|
class xas_simple_scan_schema(BasicScanMetadata):
|
||||||
|
|||||||
Reference in New Issue
Block a user