25 Commits

Author SHA1 Message Date
4a7e231518 feat(label_box): initial demo 2025-05-07 15:06:22 +02:00
ci_update_bot
20759e7ff1 docs: Update device list 2025-05-07 11:44:11 +00:00
b03b90a86a fix: fix range checks in Mo1Bragg and IonizationChamber 2025-05-07 13:38:43 +02:00
74e0b01b02 fix: temporary comment, issue created #16 2025-05-07 12:52:50 +02:00
gac-x01da
7b7a24b6c8 refactor: formatting 2025-05-07 12:40:13 +02:00
gac-x01da
31ff28236b fix: update config, remove cameras for the moment 2025-05-07 12:38:41 +02:00
gac-x01da
002a3323a0 fix(ion-chambers): fix ion chamber code at beamline 2025-05-07 12:38:04 +02:00
gac-x01da
24d81bb180 build: update black dependency to ~=25.0 2025-05-07 12:37:33 +02:00
gac-x01da
32e24cd92a fix: fix occasional crash of mo1_bragg for scan; closes #11 2025-05-07 11:15:21 +02:00
03e3b1c605 fix: fix imports in basler_cam 2025-05-06 16:26:27 +02:00
6ab1a2941c fix: fix typo in device config mo1_bragg 2025-05-06 11:19:18 +02:00
b8a050c424 tests: fix DeviceMessages for tests 2025-05-05 15:14:11 +02:00
gac-x01da
a8e7325f0f fix(mo1-bragg): fix error upon fresh start, not yet working. 2025-05-05 14:31:03 +02:00
gac-x01da
ae50cbdfd1 refactor(device-config): extend device config 2025-05-05 14:31:03 +02:00
gac-x01da
c164414631 fix(camera): add throttled update to cameras 2025-05-05 14:31:03 +02:00
gac-x01da
c074240444 tpying fix 2025-05-05 14:30:36 +02:00
gac-x01da
759636cf2c fix type reffoil changer 2025-05-05 14:30:36 +02:00
gac-x01da
510073d2f2 fix(reffoil-changer): add scaninfo to __init__ signature 2025-05-05 14:30:36 +02:00
gac-x01da
17b671dd4b Introduction of reference foil changer 2025-05-05 14:30:36 +02:00
gac-x01da
fe3e8b6291 fix: update devices in configs, to be checked 2025-05-05 14:30:36 +02:00
gac-x01da
b7f72f8e44 wip support BEC core scans 2025-05-05 14:28:55 +02:00
gac-x01da
8cad42920b Changed readout priority from async to baseline for new devices 2025-05-05 14:28:39 +02:00
gac-x01da
c43ca4aaa8 fix (mo1-bragg): fix code after test with hardware at the beamline 2025-05-05 14:28:26 +02:00
ce046f55f6 refactor (mo1-bragg): refactored Mo1 Bragg class with new base class PSIDeviceBase 2025-05-05 14:28:26 +02:00
gac-x01da
849b2d2bdb feat: add camera and power supply ophyd classes 2025-05-05 14:27:43 +02:00
19 changed files with 1110 additions and 351 deletions

View File

@@ -1,6 +1,6 @@
"""
Pre-startup script for BEC client. This script is executed before the BEC client
is started. It can be used to add additional command line arguments.
is started. It can be used to add additional command line arguments.
"""
from bec_lib.service_config import ServiceConfig

View File

@@ -0,0 +1,150 @@
from qtpy.QtCore import Signal
from qtpy.QtWidgets import QGridLayout, QSizePolicy, QVBoxLayout, QPushButton
from qtpy.QtWidgets import QWidget
from qtpy.QtWidgets import QGroupBox
from qtpy.QtWidgets import QLabel
from bec_widgets import BECWidget, SafeProperty, SafeSlot
from bec_widgets.utils import ConnectionConfig
class BECLabelBox(BECWidget, QGroupBox):
PLUGIN = True
RPC = True
ICON_NAME = "inventory_2"
USER_ACCESS = ["qroup_box_title", "qroup_box_title.setter", "add_label", "labels"]
def __init__(
self,
parent: QWidget | None = None,
config: ConnectionConfig | None = None,
client=None,
gui_id: str | None = None,
group_box_title: str = "Label",
label_config: dict = None,
**kwargs,
) -> None:
if config is None:
config = ConnectionConfig(widget_class=self.__class__.__name__)
super().__init__(parent=parent, client=client, gui_id=gui_id, config=config, **kwargs)
self.setTitle(group_box_title)
self.layout = QGridLayout(self)
self._labels = []
if label_config is not None:
self.apply_label_config(label_config)
def apply_label_config(self, label_config: dict):
"""Apply the label configuration."""
for label, config in label_config.items():
defaults_value = config.get("defaults_value", 0.0)
units = config.get("units", None)
self.add_label(label, defaults_value=defaults_value, units=units)
@property
def labels(self):
"""Return the list of labels."""
return self._labels
@SafeProperty(str)
def qroup_box_title(self):
"""Label of the group box."""
return self.title()
@qroup_box_title.setter
def qroup_box_title(self, value: str):
"""Set the label of the group box."""
self.setTitle(value)
def add_label(self, label: str, defaults_value: float = 0.0, units: str = None):
"""Add a label to the group box."""
text_label = QLabel(parent=self)
text_label.setText(label)
value_label = QLabel(parent=self)
value_label.setText(str(defaults_value))
unit_label = QLabel(parent=self)
unit_label.setText(units if units else "")
spacer = QWidget(self)
spacer.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
self.layout.addWidget(text_label, len(self.labels), 0)
self.layout.addWidget(spacer, len(self.labels), 1)
self.layout.addWidget(value_label, len(self.labels), 2)
self.layout.addWidget(unit_label, len(self.labels), 3)
label_holder = {
"text_label": text_label,
"value_label": value_label,
"unit_label": unit_label,
}
self._labels.append(label_holder)
@SafeSlot(float, int)
def update_label(self, value: float, index: int):
"""Update the label value."""
if index < len(self.labels):
self.labels[index]["value_label"].setText(str(value))
else:
raise IndexError("Index out of range for labels list.")
class DemoGUI(QWidget):
update_signal = Signal(float, int)
def __init__(self):
super().__init__()
self.setWindowTitle("BECLabelBox Demo")
self.layout = QVBoxLayout(self)
label_config = {
"Label 1": {"defaults_value": 0.0, "units": "m"},
"Label 2": {"defaults_value": 1.0, "units": "cm"},
"Label 3": {"defaults_value": 2.0, "units": "mm"},
}
self.label_box = BECLabelBox(
self, group_box_title="Demo Label Box", label_config=label_config
)
self.layout.addWidget(self.label_box)
# self.label_box.apply_label_config(label_config)
# alternative approach to add labels without dict config
# self.label_box.add_label("Label 1", defaults_value=0.0, units="m")
# self.label_box.add_label("Label 2", defaults_value=1.0, units="cm")
# self.label_box.add_label("Label 3", defaults_value=2.0, units="mm")
#
self.button = QPushButton("Update Labels", self)
self.button.clicked.connect(self.update_labels)
self.layout.addWidget(self.button)
self.update_signal.connect(self.label_box.update_label)
def update_labels(self):
"""Update the labels with new values."""
for i, label in enumerate(self.label_box.labels):
new_value = (i + 1) * 10.0
self.update_signal.emit(new_value, i)
if __name__ == "__main__":
import sys
from qtpy.QtWidgets import QApplication
app = QApplication(sys.argv)
demo = DemoGUI()
demo.show()
sys.exit(app.exec_())
#
# if __name__ == "__main__":
# import sys
# from qtpy.QtWidgets import QApplication
#
# app = QApplication(sys.argv)
# widget = BECLabelBox(group_box_title="Test Label Box")
# widget.show()
# widget.add_label("Test Label 1")
# widget.add_label("Test Label 2", units="m")
# widget.add_label("Test Label 3", defaults_value=1.23456789, units="m")
# sys.exit(app.exec_())

View File

@@ -210,7 +210,7 @@ cm_xstripe:
mo1_bragg:
readoutPriority: baseline
description: Positioner for the Monochromator
deviceClass: debye_bec.devices.mo1_bragg.mo1.bragg.Mo1Bragg
deviceClass: debye_bec.devices.mo1_bragg.mo1_bragg.Mo1Bragg
deviceConfig:
prefix: "X01DA-OP-MO1:BRAGG:"
onFailure: retry

View File

@@ -1,3 +1,205 @@
## Slit Diaphragm -- Physical positioners
sldi_trxr:
readoutPriority: baseline
description: Front-end slit diaphragm X-translation Ring-edge
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-FE-SLDI:TRXR
onFailure: retry
enabled: true
softwareTrigger: false
sldi_trxw:
readoutPriority: baseline
description: Front-end slit diaphragm X-translation Wall-edge
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-FE-SLDI:TRXW
onFailure: retry
enabled: true
softwareTrigger: false
sldi_tryb:
readoutPriority: baseline
description: Front-end slit diaphragm Y-translation Bottom-edge
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-FE-SLDI:TRYB
onFailure: retry
enabled: true
softwareTrigger: false
sldi_tryt:
readoutPriority: baseline
description: Front-end slit diaphragm X-translation Top-edge
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-FE-SLDI:TRYT
onFailure: retry
enabled: true
softwareTrigger: false
## Slit Diaphragm -- Virtual positioners
sldi_centerx:
readoutPriority: baseline
description: Front-end slit diaphragm X-center
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-FE-SLDI:CENTERX
onFailure: retry
enabled: true
softwareTrigger: false
sldi_gapx:
readoutPriority: baseline
description: Front-end slit diaphragm X-gap
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-FE-SLDI:GAPX
onFailure: retry
enabled: true
softwareTrigger: false
sldi_centery:
readoutPriority: baseline
description: Front-end slit diaphragm Y-center
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-FE-SLDI:CENTERY
onFailure: retry
enabled: true
softwareTrigger: false
sldi_gapy:
readoutPriority: baseline
description: Front-end slit diaphragm Y-gap
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-FE-SLDI:GAPY
onFailure: retry
enabled: true
softwareTrigger: false
## Collimating Mirror -- Physical Positioners
cm_trxu:
readoutPriority: baseline
description: Collimating Mirror X-translation upstream
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-FE-CM:TRXU
onFailure: retry
enabled: true
softwareTrigger: false
cm_trxd:
readoutPriority: baseline
description: Collimating Mirror X-translation downstream
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-FE-CM:TRXD
onFailure: retry
enabled: true
softwareTrigger: false
cm_tryu:
readoutPriority: baseline
description: Collimating Mirror Y-translation upstream
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-FE-CM:TRYU
onFailure: retry
enabled: true
softwareTrigger: false
cm_trydr:
readoutPriority: baseline
description: Collimating Mirror Y-translation downstream ring
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-FE-CM:TRYDR
onFailure: retry
enabled: true
softwareTrigger: false
cm_trydw:
readoutPriority: baseline
description: Collimating Mirror Y-translation downstream wall
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-FE-CM:TRYDW
onFailure: retry
enabled: true
softwareTrigger: false
cm_bnd:
readoutPriority: baseline
description: Collimating Mirror bender
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-FE-CM:BND
onFailure: retry
enabled: true
softwareTrigger: false
## Collimating Mirror -- Virtual Positioners
cm_rotx:
readoutPriority: baseline
description: Collimating Morror Pitch
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-FE-CM:ROTX
onFailure: retry
enabled: true
softwareTrigger: false
cm_roty:
readoutPriority: baseline
description: Collimating Morror Yaw
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-FE-CM:ROTY
onFailure: retry
enabled: true
softwareTrigger: false
cm_rotz:
readoutPriority: baseline
description: Collimating Morror Roll
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-FE-CM:ROTZ
onFailure: retry
enabled: true
softwareTrigger: false
cm_trx:
readoutPriority: baseline
description: Collimating Morror Center Point X
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-FE-CM:XTCP
onFailure: retry
enabled: true
softwareTrigger: false
cm_try:
readoutPriority: baseline
description: Collimating Morror Center Point Y
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-FE-CM:YTCP
onFailure: retry
enabled: true
softwareTrigger: false
cm_ztcp:
readoutPriority: baseline
description: Collimating Morror Center Point Z
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-FE-CM:ZTCP
onFailure: retry
enabled: true
softwareTrigger: false
cm_xstripe:
readoutPriority: baseline
description: Collimating Morror X Stripe
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-FE-CM:XSTRIPE
onFailure: retry
enabled: true
softwareTrigger: false
## Bragg Monochromator
mo1_bragg:
readoutPriority: baseline
@@ -29,35 +231,65 @@ nidaq:
enabled: true
softwareTrigger: false
## Monochromator -- Physical Positioners
mo_try:
readoutPriority: baseline
description: Monochromator Y Translation
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-OP-MO1:TRY
onFailure: retry
enabled: true
softwareTrigger: false
mo_trx:
readoutPriority: baseline
description: Monochromator X Translation
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-OP-MO1:TRY
onFailure: retry
enabled: true
softwareTrigger: false
mo_roty:
readoutPriority: baseline
description: Monochromator Yaw
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-OP-MO1:ROTY
onFailure: retry
enabled: true
softwareTrigger: false
# Ionization Chambers
# ic0:
# readoutPriority: baseline
# description: Ionization chamber 0
# deviceClass: debye_bec.devices.ionization_chambers.ionization_chamber.IonizationChamber0
# deviceConfig:
# prefix: "X01DA-"
# onFailure: retry
# enabled: true
# softwareTrigger: false
# ic1:
# readoutPriority: baseline
# description: Ionization chamber 1
# deviceClass: debye_bec.devices.ionization_chambers.ionization_chamber.IonizationChamber1
# deviceConfig:
# prefix: "X01DA-"
# onFailure: retry
# enabled: true
# softwareTrigger: false
# ic2:
# readoutPriority: baseline
# description: Ionization chamber 2
# deviceClass: debye_bec.devices.ionization_chambers.ionization_chamber.IonizationChamber2
# deviceConfig:
# prefix: "X01DA-"
# onFailure: retry
# enabled: true
# softwareTrigger: false
ic0:
readoutPriority: baseline
description: Ionization chamber 0
deviceClass: debye_bec.devices.ionization_chambers.ionization_chamber.IonizationChamber0
deviceConfig:
prefix: "X01DA-"
onFailure: retry
enabled: true
softwareTrigger: false
ic1:
readoutPriority: baseline
description: Ionization chamber 1
deviceClass: debye_bec.devices.ionization_chambers.ionization_chamber.IonizationChamber1
deviceConfig:
prefix: "X01DA-"
onFailure: retry
enabled: true
softwareTrigger: false
ic2:
readoutPriority: baseline
description: Ionization chamber 2
deviceClass: debye_bec.devices.ionization_chambers.ionization_chamber.IonizationChamber2
deviceConfig:
prefix: "X01DA-"
onFailure: retry
enabled: true
softwareTrigger: false
# ES0 Filter
@@ -71,6 +303,18 @@ es0filter:
enabled: true
softwareTrigger: false
# Reference foil changer
reffoilchanger:
readoutPriority: baseline
description: ES2 reference foil changer
deviceClass: debye_bec.devices.reffoilchanger.Reffoilchanger
deviceConfig:
prefix: "X01DA-"
onFailure: retry
enabled: true
softwareTrigger: false
# Beam Monitors
# beam_monitor_1:
@@ -94,14 +338,14 @@ es0filter:
# softwareTrigger: false
# xray_eye:
# readoutPriority: async
# description: X-ray eye
# deviceClass: debye_bec.devices.cameras.basler_cam.BaslerCam
# deviceConfig:
# prefix: "X01DA-ES-XRAYEYE:"
# onFailure: retry
# enabled: true
# softwareTrigger: false
# readoutPriority: async
# description: X-ray eye
# deviceClass: debye_bec.devices.cameras.basler_cam.BaslerCam
# deviceConfig:
# prefix: "X01DA-ES-XRAYEYE:"
# onFailure: retry
# enabled: true
# softwareTrigger: false
# Pilatus Curtain
# pilatus_curtain:

View File

@@ -1,13 +1,18 @@
# from ophyd_devices.sim.sim_signals import SetableSignal
# from ophyd_devices.utils.psi_component import PSIComponent, SignalType
from __future__ import annotations
import time
from typing import TYPE_CHECKING
import numpy as np
from ophyd import ADBase
from ophyd import ADComponent as ADCpt
from ophyd import Kind
from ophyd_devices.devices.areadetector.cam import AravisDetectorCam
from ophyd_devices.devices.areadetector.plugins import ImagePlugin_V35
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
if TYPE_CHECKING:
from bec_lib.devicemanager import ScanInfo
class BaslerCamBase(ADBase):
cam1 = ADCpt(AravisDetectorCam, "cam1:")
@@ -18,12 +23,21 @@ class BaslerCam(PSIDeviceBase, BaslerCamBase):
# preview_2d = PSIComponent(SetableSignal, signal_type=SignalType.PREVIEW, ndim=2, kind=Kind.omitted)
def __init__(self, *, name: str, prefix: str = "", scan_info: ScanInfo | None = None, **kwargs):
super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs)
self.last_emit = time.time()
self.update_frequency = 5 # Hz
def emit_to_bec(self, *args, obj=None, old_value=None, value=None, **kwargs):
if (time.time() - self.last_emit) < (1 / self.update_frequency):
return # Check logic
width = self.image1.array_size.width.get()
height = self.image1.array_size.height.get()
data = np.reshape(value, (height, width))
data = np.rot90(np.reshape(value, (height, width)), k=-1, axes=(0, 1))
# self.preview_2d.put(data)
self._run_subs(sub_type=self.SUB_DEVICE_MONITOR_2D, value=data)
self.last_emit = time.time()
def on_connected(self):
self.image1.array_data.subscribe(self.emit_to_bec, run=False)

View File

@@ -1,3 +1,8 @@
from __future__ import annotations
import time
from typing import TYPE_CHECKING
import numpy as np
from ophyd import ADBase
from ophyd import ADComponent as ADCpt
@@ -7,6 +12,9 @@ from ophyd_devices.devices.areadetector.cam import ProsilicaDetectorCam
from ophyd_devices.devices.areadetector.plugins import ImagePlugin_V35
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
if TYPE_CHECKING: # pragma: no cover
from bec_lib.devicemanager import ScanInfo
class ProsilicaCamBase(ADBase):
cam1 = ADCpt(ProsilicaDetectorCam, "cam1:")
@@ -15,11 +23,19 @@ class ProsilicaCamBase(ADBase):
class ProsilicaCam(PSIDeviceBase, ProsilicaCamBase):
def __init__(self, *, name: str, prefix: str = "", scan_info: ScanInfo | None = None, **kwargs):
super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs)
self.last_emit = time.time()
self.update_frequency = 5 # Hz
def emit_to_bec(self, *args, obj=None, old_value=None, value=None, **kwargs):
if (time.time() - self.last_emit) < (1 / self.update_frequency):
return # Check logic
width = self.image1.array_size.width.get()
height = self.image1.array_size.height.get()
data = np.reshape(value, (height, width))
data = np.rot90(np.reshape(value, (height, width)), k=-1, axes=(0, 1))
self._run_subs(sub_type=self.SUB_DEVICE_MONITOR_2D, value=data)
self.last_emit = time.time()
def on_connected(self):
self.image1.array_data.subscribe(self.emit_to_bec, run=False)

View File

@@ -3,15 +3,26 @@
### debye_bec
| Device | Documentation | Module |
| :----- | :------------- | :------ |
| Amplifiers | Class for the ES HV power supplies | [debye_bec.devices.hv_supplies](https://gitlab.psi.ch/bec/debye_bec/-/blob/main/debye_bec/devices/hv_supplies.py) |
| ES0Filter | Class for the ES0 filter station | [debye_bec.devices.es0filter](https://gitlab.psi.ch/bec/debye_bec/-/blob/main/debye_bec/devices/es0filter.py) |
| BaslerCam | | [debye_bec.devices.cameras.basler_cam](https://gitlab.psi.ch/bec/debye_bec/-/blob/main/debye_bec/devices/cameras/basler_cam.py) |
| BaslerCamBase | | [debye_bec.devices.cameras.basler_cam](https://gitlab.psi.ch/bec/debye_bec/-/blob/main/debye_bec/devices/cameras/basler_cam.py) |
| ES0Filter | Class for the ES0 filter station X01DA-ES0-FI: | [debye_bec.devices.es0filter](https://gitlab.psi.ch/bec/debye_bec/-/blob/main/debye_bec/devices/es0filter.py) |
| GasMixSetup | Class for the ES2 Pilatus Curtain | [debye_bec.devices.pilatus_curtain](https://gitlab.psi.ch/bec/debye_bec/-/blob/main/debye_bec/devices/pilatus_curtain.py) |
| Mo1Bragg | Class for the Mo1 Bragg positioner of the Debye beamline.<br> The prefix to connect to the soft IOC is X01DA-OP-MO1:BRAGG: which is connected to<br> the NI motor controller via web sockets.<br> | [debye_bec.devices.mo1_bragg](https://gitlab.psi.ch/bec/debye_bec/-/blob/main/debye_bec/devices/mo1_bragg.py) |
| Mo1BraggCalculator | Mo1 Bragg PVs to convert angle to energy or vice-versa. | [debye_bec.devices.mo1_bragg](https://gitlab.psi.ch/bec/debye_bec/-/blob/main/debye_bec/devices/mo1_bragg.py) |
| Mo1BraggCrystal | Mo1 Bragg PVs to set the crystal parameters | [debye_bec.devices.mo1_bragg](https://gitlab.psi.ch/bec/debye_bec/-/blob/main/debye_bec/devices/mo1_bragg.py) |
| Mo1BraggEncoder | Mo1 Bragg PVs to communicate with the encoder | [debye_bec.devices.mo1_bragg](https://gitlab.psi.ch/bec/debye_bec/-/blob/main/debye_bec/devices/mo1_bragg.py) |
| Mo1BraggScanControl | Mo1 Bragg PVs to control the scan after setting the parameters. | [debye_bec.devices.mo1_bragg](https://gitlab.psi.ch/bec/debye_bec/-/blob/main/debye_bec/devices/mo1_bragg.py) |
| Mo1BraggScanSettings | Mo1 Bragg PVs to set the scan setttings | [debye_bec.devices.mo1_bragg](https://gitlab.psi.ch/bec/debye_bec/-/blob/main/debye_bec/devices/mo1_bragg.py) |
| Mo1BraggStatus | Mo1 Bragg PVs for status monitoring | [debye_bec.devices.mo1_bragg](https://gitlab.psi.ch/bec/debye_bec/-/blob/main/debye_bec/devices/mo1_bragg.py) |
| Mo1TriggerSettings | Mo1 Trigger settings | [debye_bec.devices.mo1_bragg](https://gitlab.psi.ch/bec/debye_bec/-/blob/main/debye_bec/devices/mo1_bragg.py) |
| NIDAQ | NIDAQ ophyd wrapper around the NIDAQ backend currently running at x01da-cons-05<br> <br> Args:<br> prefix (str) : Prefix to the NIDAQ soft ioc, currently X01DA-PC-SCANSERVER:<br> name (str) : Name of the device<br> kind (Kind) : Ophyd Kind of the device<br> parent (Device) : Parent clas<br> device_manager : device manager as forwarded by BEC<br> | [debye_bec.devices.nidaq](https://gitlab.psi.ch/bec/debye_bec/-/blob/main/debye_bec/devices/nidaq.py) |
| GasMixSetupControl | GasMixSetup Control for Inonization Chamber 0 | [debye_bec.devices.ionization_chambers.ionization_chamber](https://gitlab.psi.ch/bec/debye_bec/-/blob/main/debye_bec/devices/ionization_chambers/ionization_chamber.py) |
| HighVoltageSuppliesControl | HighVoltage Supplies Control for Ionization Chamber 0 | [debye_bec.devices.ionization_chambers.ionization_chamber](https://gitlab.psi.ch/bec/debye_bec/-/blob/main/debye_bec/devices/ionization_chambers/ionization_chamber.py) |
| IonizationChamber0 | Ionization Chamber 0, prefix should be 'X01DA-'. | [debye_bec.devices.ionization_chambers.ionization_chamber](https://gitlab.psi.ch/bec/debye_bec/-/blob/main/debye_bec/devices/ionization_chambers/ionization_chamber.py) |
| IonizationChamber1 | Ionization Chamber 1, prefix should be 'X01DA-'. | [debye_bec.devices.ionization_chambers.ionization_chamber](https://gitlab.psi.ch/bec/debye_bec/-/blob/main/debye_bec/devices/ionization_chambers/ionization_chamber.py) |
| IonizationChamber2 | Ionization Chamber 2, prefix should be 'X01DA-'. | [debye_bec.devices.ionization_chambers.ionization_chamber](https://gitlab.psi.ch/bec/debye_bec/-/blob/main/debye_bec/devices/ionization_chambers/ionization_chamber.py) |
| Mo1Bragg | Mo1 Bragg motor for the Debye beamline.<br><br> The prefix to connect to the soft IOC is X01DA-OP-MO1:BRAGG:<br> | [debye_bec.devices.mo1_bragg.mo1_bragg](https://gitlab.psi.ch/bec/debye_bec/-/blob/main/debye_bec/devices/mo1_bragg/mo1_bragg.py) |
| Mo1BraggCalculator | Mo1 Bragg PVs to convert angle to energy or vice-versa. | [debye_bec.devices.mo1_bragg.mo1_bragg_devices](https://gitlab.psi.ch/bec/debye_bec/-/blob/main/debye_bec/devices/mo1_bragg/mo1_bragg_devices.py) |
| Mo1BraggCrystal | Mo1 Bragg PVs to set the crystal parameters | [debye_bec.devices.mo1_bragg.mo1_bragg_devices](https://gitlab.psi.ch/bec/debye_bec/-/blob/main/debye_bec/devices/mo1_bragg/mo1_bragg_devices.py) |
| Mo1BraggEncoder | Mo1 Bragg PVs to communicate with the encoder | [debye_bec.devices.mo1_bragg.mo1_bragg_devices](https://gitlab.psi.ch/bec/debye_bec/-/blob/main/debye_bec/devices/mo1_bragg/mo1_bragg_devices.py) |
| Mo1BraggPositioner | <br> Positioner implementation of the MO1 Bragg positioner.<br><br> The prefix to connect to the soft IOC is X01DA-OP-MO1:BRAGG:<br> This soft IOC connects to the NI motor and its control loop.<br> | [debye_bec.devices.mo1_bragg.mo1_bragg_devices](https://gitlab.psi.ch/bec/debye_bec/-/blob/main/debye_bec/devices/mo1_bragg/mo1_bragg_devices.py) |
| Mo1BraggScanControl | Mo1 Bragg PVs to control the scan after setting the parameters. | [debye_bec.devices.mo1_bragg.mo1_bragg_devices](https://gitlab.psi.ch/bec/debye_bec/-/blob/main/debye_bec/devices/mo1_bragg/mo1_bragg_devices.py) |
| Mo1BraggScanSettings | Mo1 Bragg PVs to set the scan setttings | [debye_bec.devices.mo1_bragg.mo1_bragg_devices](https://gitlab.psi.ch/bec/debye_bec/-/blob/main/debye_bec/devices/mo1_bragg/mo1_bragg_devices.py) |
| Mo1BraggStatus | Mo1 Bragg PVs for status monitoring | [debye_bec.devices.mo1_bragg.mo1_bragg_devices](https://gitlab.psi.ch/bec/debye_bec/-/blob/main/debye_bec/devices/mo1_bragg/mo1_bragg_devices.py) |
| Mo1TriggerSettings | Mo1 Trigger settings | [debye_bec.devices.mo1_bragg.mo1_bragg_devices](https://gitlab.psi.ch/bec/debye_bec/-/blob/main/debye_bec/devices/mo1_bragg/mo1_bragg_devices.py) |
| Nidaq | NIDAQ ophyd wrapper around the NIDAQ backend currently running at x01da-cons-05<br><br> Args:<br> prefix (str) : Prefix to the NIDAQ soft ioc, currently X01DA-PC-SCANSERVER:<br> name (str) : Name of the device<br> scan_info (ScanInfo) : ScanInfo object passed by BEC's devicemanager.<br> | [debye_bec.devices.nidaq.nidaq](https://gitlab.psi.ch/bec/debye_bec/-/blob/main/debye_bec/devices/nidaq/nidaq.py) |
| NidaqControl | Nidaq control class with all PVs | [debye_bec.devices.nidaq.nidaq](https://gitlab.psi.ch/bec/debye_bec/-/blob/main/debye_bec/devices/nidaq/nidaq.py) |
| ProsilicaCam | | [debye_bec.devices.cameras.prosilica_cam](https://gitlab.psi.ch/bec/debye_bec/-/blob/main/debye_bec/devices/cameras/prosilica_cam.py) |
| ProsilicaCamBase | | [debye_bec.devices.cameras.prosilica_cam](https://gitlab.psi.ch/bec/debye_bec/-/blob/main/debye_bec/devices/cameras/prosilica_cam.py) |
| Reffoilchanger | Class for the ES2 Reference Foil Changer | [debye_bec.devices.reffoilchanger](https://gitlab.psi.ch/bec/debye_bec/-/blob/main/debye_bec/devices/reffoilchanger.py) |

View File

@@ -1,4 +1,6 @@
from typing import Literal
from __future__ import annotations
from typing import TYPE_CHECKING, Literal
import numpy as np
from ophyd import Component as Cpt
@@ -15,6 +17,9 @@ from debye_bec.devices.ionization_chambers.ionization_chamber_enums import (
AmplifierGain,
)
if TYPE_CHECKING: # pragma: no cover
from bec_lib.devicemanager import ScanInfo
class EpicsSignalSplit(EpicsSignal):
"""Wrapper around EpicsSignal with different read and write pv"""
@@ -44,21 +49,22 @@ class GasMixSetupControl(Device):
gas2 = Cpt(EpicsSignalRO, suffix="Gas2", kind="config", doc="Gas 2")
conc2 = Cpt(EpicsSignalRO, suffix="Conc2", kind="config", doc="Concentration 2")
press = Cpt(EpicsSignalRO, suffix="PressTransm", kind="config", doc="Current Pressure")
status_msg = Cpt(EpicsSignalRO, suffix="StatusMsg0", kind="config", doc="Status")
class HighVoltageSuppliesControl(Device):
"""HighVoltage Supplies Control for Ionization Chamber 0"""
hv_v = Cpt(EpicsSignalSplit, suffix="HV1-V", kind="config", doc="HV voltage")
hv_i = Cpt(EpicsSignalSplit, suffix="HV1-I", kind="config", doc="HV current")
grid_v = Cpt(EpicsSignalSplit, suffix="HV2-V", kind="config", doc="Grid voltage")
grid_i = Cpt(EpicsSignalSplit, suffix="HV2-I", kind="config", doc="Grid current")
hv_v = Cpt(EpicsSignalSplit, suffix="HV2-V", kind="config", doc="HV voltage")
hv_i = Cpt(EpicsSignalSplit, suffix="HV2-I", kind="config", doc="HV current")
grid_v = Cpt(EpicsSignalSplit, suffix="HV1-V", kind="config", doc="Grid voltage")
grid_i = Cpt(EpicsSignalSplit, suffix="HV1-I", kind="config", doc="Grid current")
class IonizationChamber0(PSIDeviceBase):
"""Ionization Chamber 0, prefix should be 'X01DA-'."""
USER_ACCESS = ["set_gain", "set_filter", "set_hv", "set_grid", "fill"]
num = 1
amp_signals = {
"cOnOff": (
@@ -79,6 +85,7 @@ class IonizationChamber0(PSIDeviceBase):
}
amp = Dcpt(amp_signals)
gmes = Cpt(GasMixSetupControl, suffix=f"ES-GMES:IC{num-1}")
gmes_status = Cpt(EpicsSignalRO, suffix="ES-GMES:StatusMsg0", kind="config", doc="Status")
hv = Cpt(HighVoltageSuppliesControl, suffix=f"ES1-IC{num-1}:")
hv_en_signals = {
"ext_ena": (
@@ -86,13 +93,13 @@ class IonizationChamber0(PSIDeviceBase):
"ES1-IC0:HV-Ext-Ena",
{"kind": "config", "doc": "External enable signal of HV"},
),
"ena": (EpicsSignalRO, "ES1-IC0:HV-Ena", {"kind": "config", "doc": "Enable signal of HV"}),
"ena": (EpicsSignal, "ES1-IC0:HV-Ena", {"kind": "config", "doc": "Enable signal of HV"}),
}
hv_en = Dcpt(hv_en_signals)
def __init__(self, name: str, scan_info=None, **kwargs):
def __init__(self, name: str, prefix: str = "", scan_info: ScanInfo | None = None, **kwargs):
self.timeout_for_pvwait = 2.5
super().__init__(name=name, scan_info=scan_info, **kwargs)
super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs)
@typechecked
def set_gain(self, gain: Literal["1e6", "1e7", "5e7", "1e8", "1e9"] | AmplifierGain) -> None:
@@ -180,9 +187,9 @@ class IonizationChamber0(PSIDeviceBase):
hv (float) : Desired voltage for the 'HV' terminal. Voltage has to be between 0...3000
"""
if not 0 < hv < 3000:
if not (0 <= hv <= 3000):
raise ValueError(f"specified HV {hv} not within range [0 .. 3000]")
if self.hv.grid_v.get() > hv:
if not np.isclose(np.abs(hv - self.hv.grid_v.get()), 0, atol=3):
raise ValueError(f"Grid {self.hv.grid_v.get()} must not be higher than HV {hv}!")
if not self.hv_en.ena.get() == 1:
@@ -208,9 +215,9 @@ class IonizationChamber0(PSIDeviceBase):
grid (float) : Desired voltage for the 'Grid' terminal, Grid Voltage has to be between 0...3000
"""
if not 0 < grid < 3000:
if not (0 <= grid <= 3000):
raise ValueError(f"specified Grid {grid} not within range [0 .. 3000]")
if grid > self.hv.hv_v.get():
if not np.isclose(np.abs(grid - self.hv.hv_v.get()), 0, atol=3):
raise ValueError(f"Grid {grid} must not be higher than HV {self.hv.hv_v.get()}!")
if not self.hv_en.ena.get() == 1:
@@ -249,13 +256,13 @@ class IonizationChamber0(PSIDeviceBase):
wait (bool): If you like to wait for the filling to finish.
"""
if 100 < conc1 < 0:
if not (0 <= conc1 <= 100):
raise ValueError(f"Concentration 1 {conc1} out of range [0 .. 100 %]")
if 100 < conc2 < 0:
if not (0 <= conc2 <= 100):
raise ValueError(f"Concentration 2 {conc2} out of range [0 .. 100 %]")
if not np.isclose((conc1 + conc2), 100, atol=0.1):
raise ValueError(f"Conc1 {conc1} and conc2 {conc2} must sum to 100 +- 0.1")
if 3 < pressure < 0:
if not (0 <= pressure <= 3):
raise ValueError(f"Pressure {pressure} out of range [0 .. 3 bar abs]")
self.gmes.gas1_req.set(gas1).wait(timeout=3)
@@ -271,7 +278,7 @@ class IonizationChamber0(PSIDeviceBase):
timeout = 3
if not self.wait_for_condition(wait_for_status, timeout=timeout, check_stopped=True):
raise TimeoutError(
f"Ionization chamber filling process did not start after {timeout}s. Last log message {self.gmes.status_msg.get()}"
f"Ionization chamber filling process did not start after {timeout}s. Last log message {self.gmes_status.get()}"
)
def wait_for_filling_finished():
@@ -286,8 +293,8 @@ class IonizationChamber0(PSIDeviceBase):
return status
class IonizationChamber1(PSIDeviceBase):
"""Ionization Chamber 0, prefix should be 'X01DA-'."""
class IonizationChamber1(IonizationChamber0):
"""Ionization Chamber 1, prefix should be 'X01DA-'."""
num = 2
amp_signals = {
@@ -309,6 +316,7 @@ class IonizationChamber1(PSIDeviceBase):
}
amp = Dcpt(amp_signals)
gmes = Cpt(GasMixSetupControl, suffix=f"ES-GMES:IC{num-1}")
gmes_status = Cpt(EpicsSignalRO, suffix="ES-GMES:StatusMsg0", kind="config", doc="Status")
hv = Cpt(HighVoltageSuppliesControl, suffix=f"ES2-IC{num-1}:")
hv_en_signals = {
"ext_ena": (
@@ -316,13 +324,13 @@ class IonizationChamber1(PSIDeviceBase):
"ES2-IC12:HV-Ext-Ena",
{"kind": "config", "doc": "External enable signal of HV"},
),
"ena": (EpicsSignalRO, "ES2-IC12:HV-Ena", {"kind": "config", "doc": "Enable signal of HV"}),
"ena": (EpicsSignal, "ES2-IC12:HV-Ena", {"kind": "config", "doc": "Enable signal of HV"}),
}
hv_en = Dcpt(hv_en_signals)
class IonizationChamber2(PSIDeviceBase):
"""Ionization Chamber 0, prefix should be 'X01DA-'."""
class IonizationChamber2(IonizationChamber0):
"""Ionization Chamber 2, prefix should be 'X01DA-'."""
num = 3
amp_signals = {
@@ -344,6 +352,7 @@ class IonizationChamber2(PSIDeviceBase):
}
amp = Dcpt(amp_signals)
gmes = Cpt(GasMixSetupControl, suffix=f"ES-GMES:IC{num-1}")
gmes_status = Cpt(EpicsSignalRO, suffix="ES-GMES:StatusMsg0", kind="config", doc="Status")
hv = Cpt(HighVoltageSuppliesControl, suffix=f"ES2-IC{num-1}:")
hv_en_signals = {
"ext_ena": (
@@ -351,6 +360,6 @@ class IonizationChamber2(PSIDeviceBase):
"ES2-IC12:HV-Ext-Ena",
{"kind": "config", "doc": "External enable signal of HV"},
),
"ena": (EpicsSignalRO, "ES2-IC12:HV-Ena", {"kind": "config", "doc": "Enable signal of HV"}),
"ena": (EpicsSignal, "ES2-IC12:HV-Ena", {"kind": "config", "doc": "Enable signal of HV"}),
}
hv_en = Dcpt(hv_en_signals)

View File

@@ -14,7 +14,8 @@ from typing import Any, Literal
from bec_lib.devicemanager import ScanInfo
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd import DeviceStatus, StatusBase
from ophyd import DeviceStatus, Signal, StatusBase
from ophyd.status import SubscriptionStatus
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from ophyd_devices.utils.errors import DeviceStopError
from pydantic import BaseModel, Field
@@ -200,39 +201,44 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
# Load the scan parameters to the controller
self.scan_control.scan_load.put(1)
# Wait for params to be checked from controller
status = self.task_handler.submit_task(
task=self.wait_for_signal,
task_args=(self.scan_control.scan_msg, ScanControlLoadMessage.SUCCESS),
self.wait_for_signal(
self.scan_control.scan_msg,
ScanControlLoadMessage.SUCCESS,
timeout=self.timeout_for_pvwait,
)
return status
return None
def on_unstage(self) -> DeviceStatus | StatusBase | None:
"""Called while unstaging the device."""
def unstage_procedure():
if self.stopped is True:
logger.warning(f"Resetting stopped in unstage for device {self.name}.")
self._stopped = False
current_state = self.scan_control.scan_msg.get()
# Case 1, message is already ScanControlLoadMessage.PENDING
if current_state == ScanControlLoadMessage.PENDING:
return None
# Case 2, probably called after scan, backend should resolve on its own. Timeout to wait
if current_state in [ScanControlLoadMessage.STARTED, ScanControlLoadMessage.SUCCESS]:
try:
self.wait_for_signal(
self.scan_control.scan_msg,
ScanControlLoadMessage.PENDING,
timeout=self.timeout_for_pvwait / 2,
timeout=self.timeout_for_pvwait,
)
return
except TimeoutError:
logger.warning(
f"Timeout after {self.timeout_for_pvwait} while waiting for scan validation"
f"Timeout in on_unstage of {self.name} after {self.timeout_for_pvwait}s, current scan_control_message : {self.scan_control.scan_msg.get()}"
)
time.sleep(0.25)
start_time = time.time()
while time.time() - start_time < self.timeout_for_pvwait / 2:
if not self.scan_control.scan_msg.get() == ScanControlLoadMessage.PENDING:
break
time.sleep(0.1)
raise TimeoutError(
f"Device {self.name} run into timeout after {self.timeout_for_pvwait} while waiting for scan validation"
)
status = self.task_handler.submit_task(unstage_procedure)
status.wait()
def callback(*, old_value, value, **kwargs):
if value == ScanControlLoadMessage.PENDING:
return True
return False
status = SubscriptionStatus(self.scan_control.scan_msg, callback=callback)
self.scan_control.scan_val_reset.put(1)
status.wait(timeout=self.timeout_for_pvwait)
return None
def on_pre_scan(self) -> DeviceStatus | StatusBase | None:
@@ -268,11 +274,14 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
if scan_duration < 0.1
else self.scan_control.scan_start_timer.put
)
def callback(*, old_value, value, **kwargs):
if old_value == ScanControlScanStatus.READY and value == ScanControlScanStatus.RUNNING:
return True
return False
status = SubscriptionStatus(self.scan_control.scan_status, callback=callback)
start_func(1)
status = self.task_handler.submit_task(
task=self.wait_for_signal,
task_args=(self.scan_control.scan_status, ScanControlScanStatus.RUNNING),
)
return status
def on_stop(self) -> None:
@@ -328,7 +337,7 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
time.sleep(0.1)
# If we end up here, the status did not resolve
raise TimeoutError(
f"Device {self.name} run into timeout after {timeout}s while waiting for scan to start"
f"Device {self.name} run into timeout after {timeout}s for signal {signal.name} with value {signal.get()}, expected {value}"
)
@typechecked
@@ -454,20 +463,29 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
Raises:
TimeoutError: If the scan message is not available after the timeout
"""
state = self.scan_control.scan_msg.get()
if state != target_state:
try:
self.wait_for_signal(self.scan_control.scan_msg, target_state, timeout=1)
except TimeoutError as exc:
logger.warning(
f"Resetting scan validation in stage for state: {ScanControlLoadMessage(state)}, "
f"Resetting scan validation in stage for state: {ScanControlLoadMessage(self.scan_control.scan_msg.get())}, "
f"retry .get() on scan_control: {ScanControlLoadMessage(self.scan_control.scan_msg.get())} and sleeping 1s"
)
self.scan_control.scan_val_reset.put(1)
# Sleep to ensure the reset is done
time.sleep(1)
current_scan_msg = self.scan_control.scan_msg.get()
try:
self.wait_for_signal(self.scan_control.scan_msg, target_state)
except TimeoutError as exc:
raise TimeoutError(
f"Timeout after {self.timeout_for_pvwait} while waiting for scan status,"
f" current state: {ScanControlScanStatus(self.scan_control.scan_msg.get())}"
) from exc
def callback(*, old_value, value, **kwargs):
if old_value == current_scan_msg and value == target_state:
return True
return False
status = SubscriptionStatus(self.scan_control.scan_msg, callback=callback)
self.scan_control.scan_val_reset.put(1)
status.wait(timeout=self.timeout_for_pvwait)
# try:
# self.wait_for_signal(self.scan_control.scan_msg, target_state, timeout=4)
# except TimeoutError as exc:
# raise TimeoutError(
# f"Timeout after {self.timeout_for_pvwait} while waiting for scan status,"
# f" current state: {ScanControlScanStatus(self.scan_control.scan_msg.get())}"
# ) from exc

View File

@@ -37,12 +37,12 @@ def compute_spline(
low_deg = low_deg - POSITION_COMPONSATION
high_deg = high_deg + POSITION_COMPONSATION
if p_kink < 0 or p_kink > 100:
if not (0 <= p_kink <= 100):
raise Mo1UtilsSplineError(
"Kink position not within range of [0..100%]" + f"for p_kink: {p_kink}"
)
if e_kink_deg < low_deg or e_kink_deg > high_deg:
if not (low_deg < e_kink_deg < high_deg):
raise Mo1UtilsSplineError(
"Kink energy not within selected energy range of scan,"
+ f"for e_kink_deg {e_kink_deg}, low_deg {low_deg} and"

View File

@@ -1,18 +1,20 @@
from __future__ import annotations
from typing import Literal, TYPE_CHECKING, cast
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from ophyd import Device, Kind, DeviceStatus, Component as Cpt
from ophyd import EpicsSignal, EpicsSignalRO, StatusBase
from ophyd_devices.sim.sim_signals import SetableSignal
from typing import TYPE_CHECKING, Literal, cast
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd import Device, DeviceStatus, EpicsSignal, EpicsSignalRO, Kind, StatusBase
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from ophyd_devices.sim.sim_signals import SetableSignal
from debye_bec.devices.nidaq.nidaq_enums import (
NIDAQCompression,
ScanType,
NidaqState,
ScanRates,
ReadoutRange,
EncoderTypes,
NIDAQCompression,
NidaqState,
ReadoutRange,
ScanRates,
ScanType,
)
if TYPE_CHECKING: # pragma: no cover
@@ -29,69 +31,265 @@ class NidaqControl(Device):
"""Nidaq control class with all PVs"""
### Readback PVs for EpicsEmitter ###
ai0 = Cpt(EpicsSignalRO, suffix="NIDAQ-AI0", kind=Kind.normal, doc="EPICS analog input 0",auto_monitor=True)
ai1 = Cpt(EpicsSignalRO, suffix="NIDAQ-AI1", kind=Kind.normal, doc="EPICS analog input 1",auto_monitor=True)
ai2 = Cpt(EpicsSignalRO, suffix="NIDAQ-AI2", kind=Kind.normal, doc="EPICS analog input 2",auto_monitor=True)
ai3 = Cpt(EpicsSignalRO, suffix="NIDAQ-AI3", kind=Kind.normal, doc="EPICS analog input 3",auto_monitor=True)
ai4 = Cpt(EpicsSignalRO, suffix="NIDAQ-AI4", kind=Kind.normal, doc="EPICS analog input 4",auto_monitor=True)
ai5 = Cpt(EpicsSignalRO, suffix="NIDAQ-AI5", kind=Kind.normal, doc="EPICS analog input 5",auto_monitor=True)
ai6 = Cpt(EpicsSignalRO, suffix="NIDAQ-AI6", kind=Kind.normal, doc="EPICS analog input 6",auto_monitor=True)
ai7 = Cpt(EpicsSignalRO, suffix="NIDAQ-AI7", kind=Kind.normal, doc="EPICS analog input 7",auto_monitor=True)
ai0 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-AI0",
kind=Kind.normal,
doc="EPICS analog input 0",
auto_monitor=True,
)
ai1 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-AI1",
kind=Kind.normal,
doc="EPICS analog input 1",
auto_monitor=True,
)
ai2 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-AI2",
kind=Kind.normal,
doc="EPICS analog input 2",
auto_monitor=True,
)
ai3 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-AI3",
kind=Kind.normal,
doc="EPICS analog input 3",
auto_monitor=True,
)
ai4 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-AI4",
kind=Kind.normal,
doc="EPICS analog input 4",
auto_monitor=True,
)
ai5 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-AI5",
kind=Kind.normal,
doc="EPICS analog input 5",
auto_monitor=True,
)
ai6 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-AI6",
kind=Kind.normal,
doc="EPICS analog input 6",
auto_monitor=True,
)
ai7 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-AI7",
kind=Kind.normal,
doc="EPICS analog input 7",
auto_monitor=True,
)
ci0 = Cpt(EpicsSignalRO, suffix="NIDAQ-CI0", kind=Kind.normal, doc="EPICS counter input 0", auto_monitor=True)
ci1 = Cpt(EpicsSignalRO, suffix="NIDAQ-CI1", kind=Kind.normal, doc="EPICS counter input 1", auto_monitor=True)
ci2 = Cpt(EpicsSignalRO, suffix="NIDAQ-CI2", kind=Kind.normal, doc="EPICS counter input 2", auto_monitor=True)
ci3 = Cpt(EpicsSignalRO, suffix="NIDAQ-CI3", kind=Kind.normal, doc="EPICS counter input 3", auto_monitor=True)
ci4 = Cpt(EpicsSignalRO, suffix="NIDAQ-CI4", kind=Kind.normal, doc="EPICS counter input 4", auto_monitor=True)
ci5 = Cpt(EpicsSignalRO, suffix="NIDAQ-CI5", kind=Kind.normal, doc="EPICS counter input 5", auto_monitor=True)
ci6 = Cpt(EpicsSignalRO, suffix="NIDAQ-CI6", kind=Kind.normal, doc="EPICS counter input 6", auto_monitor=True)
ci7 = Cpt(EpicsSignalRO, suffix="NIDAQ-CI7", kind=Kind.normal, doc="EPICS counter input 7", auto_monitor=True)
ci0 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-CI0",
kind=Kind.normal,
doc="EPICS counter input 0",
auto_monitor=True,
)
ci1 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-CI1",
kind=Kind.normal,
doc="EPICS counter input 1",
auto_monitor=True,
)
ci2 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-CI2",
kind=Kind.normal,
doc="EPICS counter input 2",
auto_monitor=True,
)
ci3 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-CI3",
kind=Kind.normal,
doc="EPICS counter input 3",
auto_monitor=True,
)
ci4 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-CI4",
kind=Kind.normal,
doc="EPICS counter input 4",
auto_monitor=True,
)
ci5 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-CI5",
kind=Kind.normal,
doc="EPICS counter input 5",
auto_monitor=True,
)
ci6 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-CI6",
kind=Kind.normal,
doc="EPICS counter input 6",
auto_monitor=True,
)
ci7 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-CI7",
kind=Kind.normal,
doc="EPICS counter input 7",
auto_monitor=True,
)
di0 = Cpt(EpicsSignalRO, suffix="NIDAQ-DI0", kind=Kind.normal, doc="EPICS digital input 0", auto_monitor=True)
di1 = Cpt(EpicsSignalRO, suffix="NIDAQ-DI1", kind=Kind.normal, doc="EPICS digital input 1", auto_monitor=True)
di2 = Cpt(EpicsSignalRO, suffix="NIDAQ-DI2", kind=Kind.normal, doc="EPICS digital input 2", auto_monitor=True)
di3 = Cpt(EpicsSignalRO, suffix="NIDAQ-DI3", kind=Kind.normal, doc="EPICS digital input 3", auto_monitor=True)
di4 = Cpt(EpicsSignalRO, suffix="NIDAQ-DI4", kind=Kind.normal, doc="EPICS digital input 4", auto_monitor=True)
di0 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-DI0",
kind=Kind.normal,
doc="EPICS digital input 0",
auto_monitor=True,
)
di1 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-DI1",
kind=Kind.normal,
doc="EPICS digital input 1",
auto_monitor=True,
)
di2 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-DI2",
kind=Kind.normal,
doc="EPICS digital input 2",
auto_monitor=True,
)
di3 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-DI3",
kind=Kind.normal,
doc="EPICS digital input 3",
auto_monitor=True,
)
di4 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-DI4",
kind=Kind.normal,
doc="EPICS digital input 4",
auto_monitor=True,
)
enc_epics = Cpt(
EpicsSignalRO,
suffix="NIDAQ-ENC",
kind=Kind.normal,
doc="EPICS Encoder reading",
auto_monitor=True,
)
enc_epics = Cpt(EpicsSignalRO, suffix="NIDAQ-ENC", kind=Kind.normal, doc="EPICS Encoder reading", auto_monitor=True)
### Readback for BEC emitter ###
ai0_mean = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 0, MEAN")
ai1_mean = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 1, MEAN")
ai2_mean = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 2, MEAN")
ai3_mean = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 3, MEAN")
ai4_mean = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 4, MEAN")
ai5_mean = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 5, MEAN")
ai6_mean = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 6, MEAN")
ai7_mean = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 7, MEAN")
ai0_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 0, MEAN"
)
ai1_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 1, MEAN"
)
ai2_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 2, MEAN"
)
ai3_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 3, MEAN"
)
ai4_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 4, MEAN"
)
ai5_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 5, MEAN"
)
ai6_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 6, MEAN"
)
ai7_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 7, MEAN"
)
ai0_std_dev = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 0, STD")
ai1_std_dev = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 1, STD")
ai2_std_dev = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 2, STD")
ai3_std_dev = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 3, STD")
ai4_std_dev = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 4, STD")
ai5_std_dev = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 5, STD")
ai6_std_dev = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 6, STD")
ai7_std_dev = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 7, STD")
ai0_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 0, STD"
)
ai1_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 1, STD"
)
ai2_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 2, STD"
)
ai3_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 3, STD"
)
ai4_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 4, STD"
)
ai5_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 5, STD"
)
ai6_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 6, STD"
)
ai7_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 7, STD"
)
ci0_mean = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 0, MEAN")
ci1_mean = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 1, MEAN")
ci2_mean = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 2, MEAN")
ci3_mean = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 3, MEAN")
ci4_mean = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 4, MEAN")
ci5_mean = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 5, MEAN")
ci6_mean = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 6, MEAN")
ci7_mean = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 7, MEAN")
ci0_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 0, MEAN"
)
ci1_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 1, MEAN"
)
ci2_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 2, MEAN"
)
ci3_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 3, MEAN"
)
ci4_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 4, MEAN"
)
ci5_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 5, MEAN"
)
ci6_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 6, MEAN"
)
ci7_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 7, MEAN"
)
ci0_std_dev = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 0. STD")
ci1_std_dev = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 1. STD")
ci2_std_dev = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 2. STD")
ci3_std_dev = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 3. STD")
ci4_std_dev = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 4. STD")
ci5_std_dev = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 5. STD")
ci6_std_dev = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 6. STD")
ci7_std_dev = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 7. STD")
ci0_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 0. STD"
)
ci1_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 1. STD"
)
ci2_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 2. STD"
)
ci3_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 3. STD"
)
ci4_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 4. STD"
)
ci5_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 5. STD"
)
ci6_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 6. STD"
)
ci7_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 7. STD"
)
di0_max = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream digital input 0, MAX")
di1_max = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream digital input 1, MAX")
@@ -135,6 +333,7 @@ class Nidaq(PSIDeviceBase, NidaqControl):
def __init__(self, prefix: str = "", *, name: str, scan_info: ScanInfo = None, **kwargs):
super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs)
self.timeout_wait_for_signal = 5 # put 5s firsts
self._timeout_wait_for_pv = 3 # 3s timeout for pv calls
self.valid_scan_names = [
"xas_simple_scan",
"xas_simple_scan_with_xrd",
@@ -274,11 +473,14 @@ class Nidaq(PSIDeviceBase, NidaqControl):
Default values for signals should be set here.
"""
if not self.wait_for_condition(
condition= lambda: self.state.get() == NidaqState.STANDBY,
timeout = self.timeout_wait_for_signal,
check_stopped=True):
raise NidaqError(f"Device {self.name} has not been reached in state STANDBY, current state {NidaqState(self.state.get())}")
self.scan_duration.set(0).wait()
condition=lambda: self.state.get() == NidaqState.STANDBY,
timeout=self.timeout_wait_for_signal,
check_stopped=True,
):
raise NidaqError(
f"Device {self.name} has not been reached in state STANDBY, current state {NidaqState(self.state.get())}"
)
self.scan_duration.set(0).wait(timeout=self._timeout_wait_for_pv)
def on_stage(self) -> DeviceStatus | StatusBase | None:
"""
@@ -291,22 +493,26 @@ class Nidaq(PSIDeviceBase, NidaqControl):
return None
if not self.wait_for_condition(
condition=lambda: self.state.get() == NidaqState.STANDBY, timeout=self.timeout_wait_for_signal, check_stopped=True
condition=lambda: self.state.get() == NidaqState.STANDBY,
timeout=self.timeout_wait_for_signal,
check_stopped=True,
):
raise NidaqError(
f"Device {self.name} has not been reached in state STANDBY, current state {NidaqState(self.state.get())}"
)
self.scan_type.set(ScanType.TRIGGERED).wait()
self.scan_duration.set(0).wait()
self.stage_call.set(1).wait()
self.scan_type.set(ScanType.TRIGGERED).wait(timeout=self._timeout_wait_for_pv)
self.scan_duration.set(0).wait(timeout=self._timeout_wait_for_pv)
self.stage_call.set(1).wait(timeout=self._timeout_wait_for_pv)
if not self.wait_for_condition(
condition=lambda: self.state.get() == NidaqState.STAGE, timeout=self.timeout_wait_for_signal, check_stopped=True
condition=lambda: self.state.get() == NidaqState.STAGE,
timeout=self.timeout_wait_for_signal,
check_stopped=True,
):
raise NidaqError(
f"Device {self.name} has not been reached in state STAGE, current state {NidaqState(self.state.get())}"
)
self.kickoff_call.set(1).wait()
self.kickoff_call.set(1).wait(timeout=self._timeout_wait_for_pv)
logger.info(f"Device {self.name} was staged: {NidaqState(self.state.get())}")
def on_unstage(self) -> DeviceStatus | StatusBase | None:
@@ -360,10 +566,10 @@ class Nidaq(PSIDeviceBase, NidaqControl):
if not self._check_if_scan_name_is_valid():
return None
self.on_stop()
#TODO check if this wait can be removed. We are waiting in unstage anyways which will always be called afterwards
# TODO check if this wait can be removed. We are waiting in unstage anyways which will always be called afterwards
# Wait for device to be stopped
status = self.wait_for_condition(
condition = lambda: self.state.get() == NidaqState.STANDBY,
condition=lambda: self.state.get() == NidaqState.STANDBY,
check_stopped=True,
timeout=self.timeout_wait_for_signal,
)
@@ -374,4 +580,4 @@ class Nidaq(PSIDeviceBase, NidaqControl):
def on_stop(self) -> None:
"""Called when the device is stopped."""
self.stop_call.set(1).wait()
self.stop_call.put(1)

View File

@@ -2,17 +2,22 @@ import enum
class NIDAQCompression(str, enum.Enum):
""" Options for Compression"""
"""Options for Compression"""
OFF = 0
ON = 1
class ScanType(int, enum.Enum):
""" Triggering options of the backend"""
"""Triggering options of the backend"""
TRIGGERED = 0
CONTINUOUS = 1
class NidaqState(int, enum.Enum):
""" Possible States of the NIDAQ backend"""
"""Possible States of the NIDAQ backend"""
DISABLED = 0
STANDBY = 1
STAGE = 2
@@ -20,8 +25,10 @@ class NidaqState(int, enum.Enum):
ACQUIRE = 4
UNSTAGE = 5
class ScanRates(int, enum.Enum):
""" Sampling Rate options for the backend, in kHZ and MHz"""
"""Sampling Rate options for the backend, in kHZ and MHz"""
HUNDRED_KHZ = 0
FIVE_HUNDRED_KHZ = 1
ONE_MHZ = 2
@@ -31,15 +38,19 @@ class ScanRates(int, enum.Enum):
TEN_MHZ = 6
FOURTEEN_THREE_MHZ = 7
class ReadoutRange(int, enum.Enum):
"""ReadoutRange in +-V"""
ONE_V = 0
TWO_V = 1
FIVE_V = 2
TEN_V = 3
class EncoderTypes(int, enum.Enum):
""" Encoder Types"""
"""Encoder Types"""
X_1 = 0
X_2 = 1
X_4 = 2

View File

@@ -0,0 +1,194 @@
"""ES2 Reference Foil Changer"""
from __future__ import annotations
import enum
from typing import TYPE_CHECKING
from ophyd import Component as Cpt
from ophyd import EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV
from ophyd.status import DeviceStatus
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from ophyd_devices.utils.errors import DeviceStopError
if TYPE_CHECKING:
from bec_lib.devicemanager import ScanInfo
class Status(int, enum.Enum):
"""Enum class for the status field"""
BOOT = 0
RETRACTED = 1
INSERTED = 2
MOVING = 3
ERROR = 4
class OpMode(int, enum.Enum):
"""Enum class for the Operating Mode field"""
USERMODE = 0
MAINTENANCEMODE = 1
DIAGNOSTICMODE = 2
ERRORMODE = 3
class Reffoilchanger(PSIDeviceBase):
"""Class for the ES2 Reference Foil Changer"""
USER_ACCESS = ["insert"]
inserted = Cpt(
EpicsSignalRO, suffix="ES2-REF:TRY-FilterInserted", kind="config", doc="Inserted indicator"
)
retracted = Cpt(
EpicsSignalRO,
suffix="ES2-REF:TRY-FilterRetracted",
kind="config",
doc="Retracted indicator",
)
moving = Cpt(EpicsSignalRO, suffix="ES2-REF:ROTY.MOVN", kind="config", doc="Moving indicator")
status = Cpt(
EpicsSignal, suffix="ES2-REF:SELN-FilterState-ENUM_RBV", kind="config", doc="Status"
)
op_mode = Cpt(
EpicsSignalWithRBV, suffix="ES2-REF:SELN-OpMode-ENUM", kind="config", doc="Status"
)
ref_set = Cpt(EpicsSignal, suffix="ES2-REF:SELN-SET", kind="config", doc="Requested reference")
ref_rb = Cpt(
EpicsSignalRO, suffix="ES2-REF:SELN-RB", kind="config", doc="Currently set reference"
)
foil01 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL01.DESC", kind="config", doc="Foil 01")
foil02 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL02.DESC", kind="config", doc="Foil 02")
foil03 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL03.DESC", kind="config", doc="Foil 03")
foil04 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL04.DESC", kind="config", doc="Foil 04")
foil05 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL05.DESC", kind="config", doc="Foil 05")
foil06 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL06.DESC", kind="config", doc="Foil 06")
foil07 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL07.DESC", kind="config", doc="Foil 07")
foil08 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL08.DESC", kind="config", doc="Foil 08")
foil09 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL09.DESC", kind="config", doc="Foil 09")
foil10 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL10.DESC", kind="config", doc="Foil 10")
foil11 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL11.DESC", kind="config", doc="Foil 11")
foil12 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL12.DESC", kind="config", doc="Foil 12")
foil13 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL13.DESC", kind="config", doc="Foil 13")
foil14 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL14.DESC", kind="config", doc="Foil 14")
foil15 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL15.DESC", kind="config", doc="Foil 15")
foil16 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL16.DESC", kind="config", doc="Foil 16")
foil17 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL17.DESC", kind="config", doc="Foil 17")
foil18 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL18.DESC", kind="config", doc="Foil 18")
foil19 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL19.DESC", kind="config", doc="Foil 19")
foil20 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL20.DESC", kind="config", doc="Foil 20")
foil21 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL21.DESC", kind="config", doc="Foil 21")
foil22 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL22.DESC", kind="config", doc="Foil 22")
foil23 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL23.DESC", kind="config", doc="Foil 23")
foil24 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL24.DESC", kind="config", doc="Foil 24")
foil25 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL25.DESC", kind="config", doc="Foil 25")
foil26 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL26.DESC", kind="config", doc="Foil 26")
foil27 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL27.DESC", kind="config", doc="Foil 27")
foil28 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL28.DESC", kind="config", doc="Foil 28")
foil29 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL29.DESC", kind="config", doc="Foil 29")
foil30 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL30.DESC", kind="config", doc="Foil 30")
foil31 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL31.DESC", kind="config", doc="Foil 31")
foil32 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL32.DESC", kind="config", doc="Foil 32")
foil33 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL33.DESC", kind="config", doc="Foil 33")
foil34 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL34.DESC", kind="config", doc="Foil 34")
foil35 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL35.DESC", kind="config", doc="Foil 35")
foil36 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL36.DESC", kind="config", doc="Foil 36")
foil37 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL37.DESC", kind="config", doc="Foil 37")
foil38 = Cpt(EpicsSignalRO, suffix="ES-REFFOIL:FOIL38.DESC", kind="config", doc="Foil 38")
def __init__(self, *, name: str, prefix: str = "", scan_info: ScanInfo | None = None, **kwargs):
super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs)
self.foils = [
self.foil01,
self.foil02,
self.foil03,
self.foil04,
self.foil05,
self.foil06,
self.foil07,
self.foil08,
self.foil09,
self.foil10,
self.foil11,
self.foil12,
self.foil13,
self.foil14,
self.foil15,
self.foil16,
self.foil17,
self.foil18,
self.foil19,
self.foil20,
self.foil21,
self.foil22,
self.foil23,
self.foil24,
self.foil25,
self.foil26,
self.foil27,
self.foil28,
self.foil29,
self.foil30,
self.foil31,
self.foil32,
self.foil33,
self.foil34,
self.foil35,
self.foil36,
self.foil37,
self.foil38,
]
def insert(self, ref: str, wait: bool = False) -> DeviceStatus:
"""Insert a reference
Args:
ref (str) : Desired reference foil name, e.g. Fe or Pt
wait (bool): If you like to wait for the filling to finish. Default False.
"""
filter_number = -1
for i, foil in enumerate(self.foils):
if foil.get() == ref:
filter_number = i + 1
break
if filter_number == -1:
raise ValueError(f"Requested foil ({ref}) is not in list of available foils")
if self.op_mode.get() == OpMode.USERMODE:
self.ref_set.put(filter_number)
def wait_for_status():
return (
(self.status.get() == Status.RETRACTED)
or (self.status.get() == Status.MOVING)
or (
self.ref_rb.get() < (filter_number + 0.2)
and self.ref_rb.get() > (filter_number - 0.2)
)
)
timeout = 3
if not self.wait_for_condition(wait_for_status, timeout=timeout, check_stopped=True):
raise TimeoutError(
f"Reference foil changer did not retract the current foil within {timeout}s"
)
def wait_for_change_finished():
return self.status.get() == Status.INSERTED and self.op_mode == OpMode.USERMODE
# Wait until new reference foil is inserted
status = self.task_handler.submit_task(
task=self.wait_for_condition, task_args=(wait_for_change_finished, 5, True)
)
if wait:
status.wait()
return status
else:
raise DeviceStopError(
f"Reference foil changer must be in User Mode but is in {self.op_mode.get(as_string=True)}"
)

View File

@@ -1 +1,6 @@
from .mono_bragg_scans import XASSimpleScan, XASSimpleScanWithXRD, XASAdvancedScan, XASAdvancedScanWithXRD
from .mono_bragg_scans import (
XASAdvancedScan,
XASAdvancedScanWithXRD,
XASSimpleScan,
XASSimpleScanWithXRD,
)

View File

@@ -17,7 +17,7 @@ dependencies = ["numpy", "scipy", "bec_lib", "h5py", "ophyd_devices"]
[project.optional-dependencies]
dev = [
"bec_server",
"black ~= 24.0",
"black ~= 25.0",
"isort",
"coverage",
"pylint",

View File

@@ -181,6 +181,7 @@ def test_update_scan_parameters(mock_bragg):
scan_id="my_scan_id",
status="closed",
request_inputs={
"inputs": {},
"kwargs": {
"start": 0,
"stop": 5,
@@ -196,7 +197,7 @@ def test_update_scan_parameters(mock_bragg):
"cycle_high": 5,
"p_kink": 50,
"e_kink": 8000,
}
},
},
info={
"kwargs": {
@@ -236,9 +237,14 @@ def test_kickoff_scan(mock_bragg):
dev.scan_control.scan_start_infinite._read_pv.mock_data = 0
status = dev.kickoff()
assert status.done is False
dev.scan_control.scan_status._read_pv.mock_data = ScanControlScanStatus.RUNNING
time.sleep(0.2)
assert status.done is True
# TODO MockPV does not support callbacks yet, so we need to improve here #16
# dev.scan_control.scan_status._read_pv.mock_data = ScanControlScanStatus.RUNNING
# dev.scan_control.scan_status._read_pv.
# status.wait(timeout=3) # Callback should resolve now
# assert status.done is True
# # dev.scan_control.scan_status._read_pv.mock_data = ScanControlScanStatus.RUNNING
# time.sleep(0.2)
# assert status.done is True
assert dev.scan_control.scan_start_timer.get() == 1
dev.scan_control.scan_duration._read_pv.mock_data = 0
@@ -441,138 +447,6 @@ def test_unstage(mock_bragg):
# )
# mock_bragg.scan_info.msg = scan_status_msg
<<<<<<< Updated upstream
# Ensure that ScanControlLoadMessage is set to SUCCESS
mock_bragg.scan_control.scan_msg._read_pv.mock_data = ScanControlLoadMessage.SUCCESS
with (
mock.patch.object(mock_bragg.scaninfo, "load_scan_metadata") as mock_load_scan_metadata,
mock.patch.object(mock_bragg, "_check_scan_msg") as mock_check_scan_msg,
mock.patch.object(mock_bragg, "on_unstage"),
):
scan_name = scan_status_msg.content["info"].get("scan_name", "")
# Chek the not implemented fly scan first, should raise Mo1BraggError
if scan_name not in [
"xas_simple_scan",
"xas_simple_scan_with_xrd",
"xas_advanced_scan",
"xas_advanced_scan_with_xrd",
]:
with pytest.raises(Mo1BraggError):
mock_bragg.stage()
assert mock_check_scan_msg.call_count == 1
assert mock_load_scan_metadata.call_count == 1
else:
with (
mock.patch.object(mock_bragg, "set_xas_settings") as mock_xas_settings,
mock.patch.object(
mock_bragg, "set_advanced_xas_settings"
) as mock_advanced_xas_settings,
mock.patch.object(mock_bragg, "set_trig_settings") as mock_trig_settings,
mock.patch.object(
mock_bragg, "set_scan_control_settings"
) as mock_set_scan_control_settings,
):
# Check xas_simple_scan
if scan_name == "xas_simple_scan":
mock_bragg.stage()
assert mock_xas_settings.call_args == mock.call(
low=scan_status_msg.content["info"]["kwargs"]["start"],
high=scan_status_msg.content["info"]["kwargs"]["stop"],
scan_time=scan_status_msg.content["info"]["kwargs"]["scan_time"],
)
assert mock_trig_settings.call_args == mock.call(
enable_low=False,
enable_high=False,
exp_time_low=0,
exp_time_high=0,
cycle_low=0,
cycle_high=0,
)
assert mock_set_scan_control_settings.call_args == mock.call(
mode=ScanControlMode.SIMPLE,
scan_duration=scan_status_msg.content["info"]["kwargs"][
"scan_duration"
],
)
# Check xas_simple_scan_with_xrd
elif scan_name == "xas_simple_scan_with_xrd":
mock_bragg.stage()
assert mock_xas_settings.call_args == mock.call(
low=scan_status_msg.content["info"]["kwargs"]["start"],
high=scan_status_msg.content["info"]["kwargs"]["stop"],
scan_time=scan_status_msg.content["info"]["kwargs"]["scan_time"],
)
assert mock_trig_settings.call_args == mock.call(
enable_low=scan_status_msg.content["info"]["kwargs"]["xrd_enable_low"],
enable_high=scan_status_msg.content["info"]["kwargs"][
"xrd_enable_high"
],
exp_time_low=scan_status_msg.content["info"]["kwargs"]["exp_time_low"],
exp_time_high=scan_status_msg.content["info"]["kwargs"][
"exp_time_high"
],
cycle_low=scan_status_msg.content["info"]["kwargs"]["cycle_low"],
cycle_high=scan_status_msg.content["info"]["kwargs"]["cycle_high"],
)
assert mock_set_scan_control_settings.call_args == mock.call(
mode=ScanControlMode.SIMPLE,
scan_duration=scan_status_msg.content["info"]["kwargs"][
"scan_duration"
],
)
# Check xas_advanced_scan
elif scan_name == "xas_advanced_scan":
mock_bragg.stage()
assert mock_advanced_xas_settings.call_args == mock.call(
low=scan_status_msg.content["info"]["kwargs"]["start"],
high=scan_status_msg.content["info"]["kwargs"]["stop"],
scan_time=scan_status_msg.content["info"]["kwargs"]["scan_time"],
p_kink=scan_status_msg.content["info"]["kwargs"]["p_kink"],
e_kink=scan_status_msg.content["info"]["kwargs"]["e_kink"],
)
assert mock_trig_settings.call_args == mock.call(
enable_low=False,
enable_high=False,
exp_time_low=0,
exp_time_high=0,
cycle_low=0,
cycle_high=0,
)
assert mock_set_scan_control_settings.call_args == mock.call(
mode=ScanControlMode.ADVANCED,
scan_duration=scan_status_msg.content["info"]["kwargs"][
"scan_duration"
],
)
# Check xas_advanced_scan_with_xrd
elif scan_name == "xas_advanced_scan_with_xrd":
mock_bragg.stage()
assert mock_advanced_xas_settings.call_args == mock.call(
low=scan_status_msg.content["info"]["kwargs"]["start"],
high=scan_status_msg.content["info"]["kwargs"]["stop"],
scan_time=scan_status_msg.content["info"]["kwargs"]["scan_time"],
p_kink=scan_status_msg.content["info"]["kwargs"]["p_kink"],
e_kink=scan_status_msg.content["info"]["kwargs"]["e_kink"],
)
assert mock_trig_settings.call_args == mock.call(
enable_low=scan_status_msg.content["info"]["kwargs"]["xrd_enable_low"],
enable_high=scan_status_msg.content["info"]["kwargs"][
"xrd_enable_high"
],
exp_time_low=scan_status_msg.content["info"]["kwargs"]["exp_time_low"],
exp_time_high=scan_status_msg.content["info"]["kwargs"][
"exp_time_high"
],
cycle_low=scan_status_msg.content["info"]["kwargs"]["cycle_low"],
cycle_high=scan_status_msg.content["info"]["kwargs"]["cycle_high"],
)
assert mock_set_scan_control_settings.call_args == mock.call(
mode=ScanControlMode.ADVANCED,
scan_duration=scan_status_msg.content["info"]["kwargs"][
"scan_duration"
],
)
=======
# # Ensure that ScanControlLoadMessage is set to SUCCESS
# mock_bragg.scan_control.scan_msg._read_pv.mock_data = ScanControlLoadMessage.SUCCESS
# with (
@@ -701,4 +575,3 @@ def test_unstage(mock_bragg):
# "scan_duration"
# ],
# )
>>>>>>> Stashed changes

View File

@@ -49,6 +49,7 @@ def get_instructions(request, ScanStubStatusMock):
def test_xas_simple_scan(scan_assembler, ScanStubStatusMock):
request = scan_assembler(XASSimpleScan, start=0, stop=5, scan_time=1, scan_duration=10)
request.device_manager.add_device("nidaq")
reference_commands = get_instructions(request, ScanStubStatusMock)
assert reference_commands == [
@@ -70,7 +71,7 @@ def test_xas_simple_scan(scan_assembler, ScanStubStatusMock):
"monitored": [],
"baseline": [],
"on_request": [],
"async": [],
"async": ["nidaq"],
},
"num_points": None,
"positions": [0.0, 5.0],
@@ -78,6 +79,7 @@ def test_xas_simple_scan(scan_assembler, ScanStubStatusMock):
"scan_type": "fly",
},
),
DeviceInstructionMessage(metadata={}, device="nidaq", action="stage", parameter={}),
DeviceInstructionMessage(
metadata={},
device=["bpm4i", "eiger", "mo1_bragg", "samx"],
@@ -104,7 +106,7 @@ def test_xas_simple_scan(scan_assembler, ScanStubStatusMock):
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=["bpm4i", "eiger", "mo1_bragg", "samx"],
device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"],
action="pre_scan",
parameter={},
),
@@ -130,7 +132,7 @@ def test_xas_simple_scan(scan_assembler, ScanStubStatusMock):
"fake_complete",
DeviceInstructionMessage(
metadata={},
device=["bpm4i", "eiger", "mo1_bragg", "samx"],
device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"],
action="unstage",
parameter={},
),
@@ -160,6 +162,7 @@ def test_xas_simple_scan_with_xrd(scan_assembler, ScanStubStatusMock):
exp_time_high=3,
cycle_high=4,
)
request.device_manager.add_device("nidaq")
reference_commands = get_instructions(request, ScanStubStatusMock)
assert reference_commands == [
@@ -181,7 +184,7 @@ def test_xas_simple_scan_with_xrd(scan_assembler, ScanStubStatusMock):
"monitored": [],
"baseline": [],
"on_request": [],
"async": [],
"async": ["nidaq"],
},
"num_points": None,
"positions": [0.0, 5.0],
@@ -189,6 +192,7 @@ def test_xas_simple_scan_with_xrd(scan_assembler, ScanStubStatusMock):
"scan_type": "fly",
},
),
DeviceInstructionMessage(metadata={}, device="nidaq", action="stage", parameter={}),
DeviceInstructionMessage(
metadata={},
device=["bpm4i", "eiger", "mo1_bragg", "samx"],
@@ -215,7 +219,7 @@ def test_xas_simple_scan_with_xrd(scan_assembler, ScanStubStatusMock):
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=["bpm4i", "eiger", "mo1_bragg", "samx"],
device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"],
action="pre_scan",
parameter={},
),
@@ -241,7 +245,7 @@ def test_xas_simple_scan_with_xrd(scan_assembler, ScanStubStatusMock):
"fake_complete",
DeviceInstructionMessage(
metadata={},
device=["bpm4i", "eiger", "mo1_bragg", "samx"],
device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"],
action="unstage",
parameter={},
),
@@ -265,6 +269,7 @@ def test_xas_advanced_scan(scan_assembler, ScanStubStatusMock):
p_kink=50,
e_kink=8500,
)
request.device_manager.add_device("nidaq")
reference_commands = get_instructions(request, ScanStubStatusMock)
assert reference_commands == [
@@ -286,7 +291,7 @@ def test_xas_advanced_scan(scan_assembler, ScanStubStatusMock):
"monitored": [],
"baseline": [],
"on_request": [],
"async": [],
"async": ["nidaq"],
},
"num_points": None,
"positions": [8000.0, 9000.0],
@@ -294,6 +299,7 @@ def test_xas_advanced_scan(scan_assembler, ScanStubStatusMock):
"scan_type": "fly",
},
),
DeviceInstructionMessage(metadata={}, device="nidaq", action="stage", parameter={}),
DeviceInstructionMessage(
metadata={},
device=["bpm4i", "eiger", "mo1_bragg", "samx"],
@@ -320,7 +326,7 @@ def test_xas_advanced_scan(scan_assembler, ScanStubStatusMock):
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=["bpm4i", "eiger", "mo1_bragg", "samx"],
device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"],
action="pre_scan",
parameter={},
),
@@ -346,7 +352,7 @@ def test_xas_advanced_scan(scan_assembler, ScanStubStatusMock):
"fake_complete",
DeviceInstructionMessage(
metadata={},
device=["bpm4i", "eiger", "mo1_bragg", "samx"],
device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"],
action="unstage",
parameter={},
),
@@ -378,6 +384,7 @@ def test_xas_advanced_scan_with_xrd(scan_assembler, ScanStubStatusMock):
exp_time_high=3,
cycle_high=4,
)
request.device_manager.add_device("nidaq")
reference_commands = get_instructions(request, ScanStubStatusMock)
assert reference_commands == [
@@ -399,7 +406,7 @@ def test_xas_advanced_scan_with_xrd(scan_assembler, ScanStubStatusMock):
"monitored": [],
"baseline": [],
"on_request": [],
"async": [],
"async": ["nidaq"],
},
"num_points": None,
"positions": [8000.0, 9000.0],
@@ -407,6 +414,7 @@ def test_xas_advanced_scan_with_xrd(scan_assembler, ScanStubStatusMock):
"scan_type": "fly",
},
),
DeviceInstructionMessage(metadata={}, device="nidaq", action="stage", parameter={}),
DeviceInstructionMessage(
metadata={},
device=["bpm4i", "eiger", "mo1_bragg", "samx"],
@@ -433,7 +441,7 @@ def test_xas_advanced_scan_with_xrd(scan_assembler, ScanStubStatusMock):
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=["bpm4i", "eiger", "mo1_bragg", "samx"],
device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"],
action="pre_scan",
parameter={},
),
@@ -459,7 +467,7 @@ def test_xas_advanced_scan_with_xrd(scan_assembler, ScanStubStatusMock):
"fake_complete",
DeviceInstructionMessage(
metadata={},
device=["bpm4i", "eiger", "mo1_bragg", "samx"],
device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"],
action="unstage",
parameter={},
),