39 Commits

Author SHA1 Message Date
gac-x01da
2fd783ad54 Bugfix advanced scan 2025-07-01 09:49:42 +02:00
gac-x01da
34dbc1839d fix test 2025-06-25 16:05:20 +02:00
gac-x01da
db6a9a502f bugfix for when nidaq is stopped early 2025-06-25 15:56:39 +02:00
gac-x01da
e6586ceab2 Make sure that parameters are set before validating 2025-06-25 15:55:54 +02:00
ed6d64c7f9 fix(camera): fix num_rotation_90 for cameras 2025-06-23 16:31:34 +02:00
43e8aea6c8 tests: fix test for MR, mo1_bragg and nidaq tests skipped, check issue 22 2025-06-20 09:30:53 +02:00
abf432f2a9 refactor: fix formatting 2025-06-20 09:19:51 +02:00
gac-x01da
b3672cf5f5 refactor: add compare and transition status for nidaq and mo1_bragg 2025-06-20 09:16:27 +02:00
gac-x01da
fa434794c3 feat(metadata-schema): add metadata schema 2025-06-20 09:16:03 +02:00
gac-x01da
9be74da098 feat(debye-nexus-structure): add first nexus template for debye 2025-06-20 09:15:39 +02:00
gac-x01da
29913cea61 refactor(nidaq-enums): add additional enums for NIDAQ 2025-06-20 09:14:40 +02:00
gac-x01da
881bc9e7a3 refactor(mo1-bragg-device): add Pvs for mono 2025-06-20 09:14:10 +02:00
e941647750 refactor(progress-signal): ProgressSignal for mo1_bragg and nidaq 2025-06-18 14:32:31 +02:00
827557b667 refactor(debye-cam): add preview signal to camera integrations 2025-06-18 14:32:31 +02:00
gac-x01da
a9fd62d249 feat: add initial file structure 2025-06-18 14:29:51 +02:00
27ff5697af fix(device-configs): change to relative path for !include syntax 2025-06-18 14:23:19 +02:00
07d05f9490 fix(nidaq): fix proper handling return of DeviceStatus for complete method 2025-06-18 14:23:19 +02:00
gac-x01da
39adeb72de test: add test for nidaq continous scan 2025-06-18 14:23:19 +02:00
gac-x01da
bc666dc807 fix: double timeout for for wait for ScanControlMessage 2025-06-18 14:23:19 +02:00
gac-x01da
89cc27a8da feat: add nidaq_continuous_scan scan 2025-06-18 14:23:19 +02:00
gac-x01da
718a001a8a typo and adding pinhole motors 2025-06-18 14:23:19 +02:00
gac-x01da
f038679d76 refactor(nidaq): add energy pv from nidaq 2025-06-18 14:23:19 +02:00
gac-x01da
a1433efbf8 update of test_config to include focusing mirror 2025-06-18 14:23:19 +02:00
gac-x01da
79ead32e79 feat(nidaq): ensure nidaq is powered on during on_connected 2025-06-18 14:23:19 +02:00
gac-x01da
c934aa8e9a refactor: update configs with optic slit config, machine config 2025-06-18 14:23:19 +02:00
0d87e958d0 Update copier template source to github 2025-06-11 16:26:24 +02:00
e4556ad90e refactor: migrate debye_bec to copier template 2025-05-16 15:21:04 +02:00
ci_update_bot
da89f9287c docs: Update device list 2025-05-16 13:19:11 +00:00
665c290a90 test(test-camers): Add unit tests for camera integration of prosilica and basler cameras 2025-05-16 15:14:17 +02:00
gac-x01da
10b0608d31 fix: debug with devices, throttle live_view to 1Hz 2025-05-16 15:14:17 +02:00
ca2cf40d6a refactor: refactor basler and prosilica cameras 2025-05-16 15:14:17 +02:00
415c601d2a feat(debye-base-cam): introduce base class for cameras at debye 2025-05-16 15:14:17 +02:00
ci_update_bot
0c1f41cd7c docs: Update device list 2025-05-16 13:08:16 +00:00
gac-x01da
0cdad97d00 refactor: add mo1_bragg_angle to x01da_test_config 2025-05-16 15:02:28 +02:00
b3f63f4f76 tests(mo1-bragg-angle): add tests for mo1_bragg_angle 2025-05-09 16:26:00 +02:00
87ea95e975 fix: improve move method of mo1_bragg_devices 2025-05-09 16:25:41 +02:00
4d9a062b8c tests: fix tests and formatting 2025-05-09 16:00:07 +02:00
gac-x01da
c782324065 feat(mo1-bragg-angle): add Mo1BraggAngle positioner class; closes #14 2025-05-09 08:41:44 +02:00
gac-x01da
5bb0df2ddf refactor(mo1-bragg-positioner): remove move_type, cleanupt of angle pvs in mo1_bragg_positioner 2025-05-09 08:36:26 +02:00
42 changed files with 1855 additions and 491 deletions

9
.copier-answers.yml Normal file
View File

@@ -0,0 +1,9 @@
# Do not edit this file!
# It is needed to track the repo template version, and editing may break things.
# This file will be overwritten by copier on template updates.
_commit: v1.0.0
_src_path: https://github.com/bec-project/plugin_copier_template.git
make_commit: false
project_name: debye_bec
widget_plugins_input: []

View File

@@ -1,7 +1,7 @@
include:
- project: bec/awi_utils
file: /templates/plugin-repo-template.yml
inputs:
name: "debye"
target: "debye_bec"
branch: $CHILD_PIPELINE_BRANCH
- file: /templates/plugin-repo-template.yml
inputs:
name: debye_bec
target: debye_bec
branch: $CHILD_PIPELINE_BRANCH
project: bec/awi_utils

View File

@@ -1,6 +1,7 @@
BSD 3-Clause License
Copyright (c) 2024, Paul Scherrer Institute
Copyright (c) 2025, Paul Scherrer Institute
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
@@ -25,4 +26,4 @@ DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

1
bin/.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
# Add anything you don't want to check in to git, e.g. very large files

View File

@@ -10,7 +10,7 @@ While command-line arguments have to be set in the pre-startup script, the
post-startup script can be used to load beamline specific information and
to setup the prompts.
from bec_lib import bec_logger
from bec_lib.logger import bec_logger
logger = bec_logger.logger

View File

@@ -0,0 +1,13 @@
curr:
readoutPriority: baseline
description: SLS ring current
deviceClass: ophyd.EpicsSignalRO
deviceConfig:
auto_monitor: true
read_pv: AGEBD-DBPM3CURR:CURRENT-AVG
deviceTags:
- machine
onFailure: buffer
enabled: true
readOnly: true
softwareTrigger: false

View File

@@ -0,0 +1,227 @@
## Optics Slits 1 -- Physical positioners
sl1_trxr:
readoutPriority: baseline
description: Optics slits 1 X-translation Ring-edge
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-OP-SL1:TRXR
onFailure: retry
enabled: true
softwareTrigger: false
deviceTags:
- optics
- slits
sl1_trxw:
readoutPriority: baseline
description: Optics slits 1 X-translation Wall-edge
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-OP-SL1:TRXW
onFailure: retry
enabled: true
softwareTrigger: false
deviceTags:
- optics
- slits
sl1_tryb:
readoutPriority: baseline
description: Optics slits 1 Y-translation Bottom-edge
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-OP-SL1:TRYB
onFailure: retry
enabled: true
softwareTrigger: false
deviceTags:
- optics
- slits
sl1_tryt:
readoutPriority: baseline
description: Optics slits 1 X-translation Top-edge
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-OP-SL1:TRYT
onFailure: retry
enabled: true
softwareTrigger: false
deviceTags:
- optics
- slits
bm1_try:
readoutPriority: baseline
description: Beam Monitor 1 Y-translation
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-OP-BM1:TRY
onFailure: retry
enabled: true
softwareTrigger: false
deviceTags:
- optics
- slits
## Optics Slits 1 -- Virtual positioners
sl1_centerx:
readoutPriority: baseline
description: Optics slits 1 X-center
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-OP-SL1:CENTERX
onFailure: retry
enabled: true
softwareTrigger: false
deviceTags:
- optics
- slits
sl1_gapx:
readoutPriority: baseline
description: Optics slits 1 X-gap
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-OP-SL1:GAPX
onFailure: retry
enabled: true
softwareTrigger: false
deviceTags:
- optics
- slits
sl1_centery:
readoutPriority: baseline
description: Optics slits 1 Y-center
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-OP-SL1:CENTERY
onFailure: retry
enabled: true
softwareTrigger: false
deviceTags:
- optics
- slits
sl1_gapy:
readoutPriority: baseline
description: Optics slits 1 Y-gap
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-OP-SL1:GAPY
onFailure: retry
enabled: true
softwareTrigger: false
deviceTags:
- optics
- slits
## Optics Slits 2 -- Physical positioners
sl2_trxr:
readoutPriority: baseline
description: Optics slits 2 X-translation Ring-edge
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-OP-SL2:TRXR
onFailure: retry
enabled: true
softwareTrigger: false
deviceTags:
- optics
- slits
sl2_trxw:
readoutPriority: baseline
description: Optics slits 2 X-translation Wall-edge
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-OP-SL2:TRXW
onFailure: retry
enabled: true
softwareTrigger: false
deviceTags:
- optics
- slits
sl2_tryb:
readoutPriority: baseline
description: Optics slits 2 Y-translation Bottom-edge
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-OP-SL2:TRYB
onFailure: retry
enabled: true
softwareTrigger: false
deviceTags:
- optics
- slits
sl2_tryt:
readoutPriority: baseline
description: Optics slits 2 X-translation Top-edge
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-OP-SL2:TRYT
onFailure: retry
enabled: true
softwareTrigger: false
deviceTags:
- optics
- slits
bm2_try:
readoutPriority: baseline
description: Beam Monitor 2 Y-translation
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-OP-BM2:TRY
onFailure: retry
enabled: true
softwareTrigger: false
deviceTags:
- optics
- slits
## Optics Slits 2 -- Virtual positioners
sl2_centerx:
readoutPriority: baseline
description: Optics slits 2 X-center
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-OP-SL2:CENTERX
onFailure: retry
enabled: true
softwareTrigger: false
deviceTags:
- optics
- slits
sl2_gapx:
readoutPriority: baseline
description: Optics slits 2 X-gap
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-OP-SL2:GAPX
onFailure: retry
enabled: true
softwareTrigger: false
deviceTags:
- optics
- slits
sl2_centery:
readoutPriority: baseline
description: Optics slits 2 Y-center
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-OP-SL2:CENTERY
onFailure: retry
enabled: true
softwareTrigger: false
deviceTags:
- optics
- slits
sl2_gapy:
readoutPriority: baseline
description: Optics slits 2 Y-gap
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-OP-SL2:GAPY
onFailure: retry
enabled: true
softwareTrigger: false
deviceTags:
- optics
- slits

View File

@@ -1,4 +1,7 @@
optic_slit_config:
- !include ./x01da_optic_slits.yaml
machine_config:
- !include ./x01da_machine.yaml
## Slit Diaphragm -- Physical positioners
sldi_trxr:
readoutPriority: baseline
@@ -210,12 +213,12 @@ mo1_bragg:
onFailure: retry
enabled: true
softwareTrigger: false
dummy_pv:
readoutPriority: monitored
description: Heartbeat of Bragg
deviceClass: ophyd.EpicsSignalRO
mo1_bragg_angle:
readoutPriority: baseline
description: Positioner for the Monochromator
deviceClass: debye_bec.devices.mo1_bragg.mo1_bragg_angle.Mo1BraggAngle
deviceConfig:
read_pv: "X01DA-OP-MO1:BRAGG:heartbeat_RBV"
prefix: "X01DA-OP-MO1:BRAGG:"
onFailure: retry
enabled: true
softwareTrigger: false
@@ -247,7 +250,7 @@ mo_trx:
description: Monochromator X Translation
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-OP-MO1:TRY
prefix: X01DA-OP-MO1:TRX
onFailure: retry
enabled: true
softwareTrigger: false
@@ -261,6 +264,120 @@ mo_roty:
enabled: true
softwareTrigger: false
## Focusing Mirror -- Physical Positioners
fm_trxu:
readoutPriority: baseline
description: Focusing Mirror X-translation upstream
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-OP-FM:TRXU
onFailure: retry
enabled: true
softwareTrigger: false
fm_trxd:
readoutPriority: baseline
description: Focusing Mirror X-translation downstream
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-OP-FM:TRXD
onFailure: retry
enabled: true
softwareTrigger: false
fm_tryd:
readoutPriority: baseline
description: Focusing Mirror Y-translation downstream
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-OP-FM:TRYD
onFailure: retry
enabled: true
softwareTrigger: false
fm_tryur:
readoutPriority: baseline
description: Focusing Mirror Y-translation upstream ring
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-OP-FM:TRYUR
onFailure: retry
enabled: true
softwareTrigger: false
fm_tryuw:
readoutPriority: baseline
description: Focusing Mirror Y-translation upstream wall
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-OP-FM:TRYUW
onFailure: retry
enabled: true
softwareTrigger: false
fm_bnd:
readoutPriority: baseline
description: Focusing Mirror bender
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-OP-FM:BND
onFailure: retry
enabled: true
softwareTrigger: false
## Focusing Mirror -- Virtual Positioners
fm_rotx:
readoutPriority: baseline
description: Focusing Morror Pitch
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-OP-FM:ROTX
onFailure: retry
enabled: true
softwareTrigger: false
fm_roty:
readoutPriority: baseline
description: Focusing Morror Yaw
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-OP-FM:ROTY
onFailure: retry
enabled: true
softwareTrigger: false
fm_rotz:
readoutPriority: baseline
description: Focusing Morror Roll
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-OP-FM:ROTZ
onFailure: retry
enabled: true
softwareTrigger: false
fm_xctp:
readoutPriority: baseline
description: Focusing Morror Center Point X
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-OP-FM:XTCP
onFailure: retry
enabled: true
softwareTrigger: false
fm_ytcp:
readoutPriority: baseline
description: Focusing Morror Center Point Y
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-OP-FM:YTCP
onFailure: retry
enabled: true
softwareTrigger: false
fm_ztcp:
readoutPriority: baseline
description: Focusing Morror Center Point Z
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-OP-FM:ZTCP
onFailure: retry
enabled: true
softwareTrigger: false
# Ionization Chambers
ic0:
@@ -337,15 +454,15 @@ reffoilchanger:
# enabled: true
# softwareTrigger: false
# xray_eye:
# readoutPriority: async
# description: X-ray eye
# deviceClass: debye_bec.devices.cameras.basler_cam.BaslerCam
# deviceConfig:
# prefix: "X01DA-ES-XRAYEYE:"
# onFailure: retry
# enabled: true
# softwareTrigger: false
xray_eye:
readoutPriority: async
description: X-ray eye
deviceClass: debye_bec.devices.cameras.basler_cam.BaslerCam
deviceConfig:
prefix: "X01DA-ES-XRAYEYE:"
onFailure: retry
enabled: true
softwareTrigger: false
# Pilatus Curtain
# pilatus_curtain:
@@ -471,4 +588,49 @@ es1_alignment_laser:
onFailure: retry
enabled: true
softwareTrigger: false
## Pinhole alignment stages -- Physical Positioners
pin1_trx:
readoutPriority: baseline
description: Pinhole X-translation
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-ES1-PIN1:TRX
onFailure: retry
enabled: true
softwareTrigger: false
tags: Endstation
pin1_try:
readoutPriority: baseline
description: Pinhole Y-translation
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-ES1-PIN1:TRY
onFailure: retry
enabled: true
softwareTrigger: false
tags: Endstation
pin1_rotx:
readoutPriority: baseline
description: Pinhole X-rotation
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-ES1-PIN1:ROTX
onFailure: retry
enabled: true
softwareTrigger: false
tags: Endstation
pin1_roty:
readoutPriority: baseline
description: Pinhole Y-rotation
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-ES1-PIN1:ROTY
onFailure: retry
enabled: true
softwareTrigger: false
tags: Endstation

View File

@@ -1,43 +1,36 @@
"""Basler camera class for Debye BEC."""
from __future__ import annotations
import time
from typing import TYPE_CHECKING
import numpy as np
from ophyd import ADBase
from ophyd import ADComponent as ADCpt
from ophyd import Component as Cpt
from ophyd_devices import PreviewSignal
from ophyd_devices.devices.areadetector.cam import AravisDetectorCam
from ophyd_devices.devices.areadetector.plugins import ImagePlugin_V35
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
if TYPE_CHECKING:
from debye_bec.devices.cameras.debye_base_cam import DebyeBaseCamera
if TYPE_CHECKING: # pragma: no cover
from bec_lib.devicemanager import ScanInfo
class BaslerCamBase(ADBase):
"""BaslerCam Base class."""
cam1 = ADCpt(AravisDetectorCam, "cam1:")
image1 = ADCpt(ImagePlugin_V35, "image1:")
class BaslerCam(PSIDeviceBase, BaslerCamBase):
class BaslerCam(DebyeBaseCamera, BaslerCamBase):
"""Basler camera class at Debye. IOC prefix: X01DA-ES-XRAYEYE:"""
# preview_2d = PSIComponent(SetableSignal, signal_type=SignalType.PREVIEW, ndim=2, kind=Kind.omitted)
def __init__(self, *, name: str, prefix: str = "", scan_info: ScanInfo | None = None, **kwargs):
super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs)
self.last_emit = time.time()
self.update_frequency = 5 # Hz
def emit_to_bec(self, *args, obj=None, old_value=None, value=None, **kwargs):
if (time.time() - self.last_emit) < (1 / self.update_frequency):
return # Check logic
width = self.image1.array_size.width.get()
height = self.image1.array_size.height.get()
data = np.rot90(np.reshape(value, (height, width)), k=-1, axes=(0, 1))
# self.preview_2d.put(data)
self._run_subs(sub_type=self.SUB_DEVICE_MONITOR_2D, value=data)
self.last_emit = time.time()
def on_connected(self):
self.image1.array_data.subscribe(self.emit_to_bec, run=False)
preview = Cpt(
PreviewSignal,
name="preview",
ndim=2,
num_rotation_90=3,
doc="Preview signal for the camera.",
)

View File

@@ -0,0 +1,138 @@
"""Base class for Camera integration at Debye."""
from __future__ import annotations
import threading
from typing import TYPE_CHECKING
import numpy as np
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd import DeviceStatus, StatusBase
from ophyd_devices import PreviewSignal
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from typeguard import typechecked
if TYPE_CHECKING: # pragma: no cover
from bec_lib.devicemanager import ScanInfo
from ophyd_devices.devices.areadetector.plugins import ImagePlugin_V35
logger = bec_logger.logger
class DebyeBaseCamera(PSIDeviceBase):
"""Base class for Debye cameras."""
USER_ACCESS = ["live_mode"]
preview = Cpt(
PreviewSignal,
name="preview",
ndim=2,
num_rotation_90=-1,
doc="Preview signal for the camera.",
)
def __init__(self, *, name: str, prefix: str = "", scan_info: ScanInfo | None = None, **kwargs):
super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs)
self.image1: "ImagePlugin_V35"
self._update_frequency = 1 # Hz
self._live_mode = False
self._live_mode_event = None
self._task_status = None
@property
def live_mode(self) -> bool:
"""Live mode status."""
return self._live_mode
@typechecked
@live_mode.setter
def live_mode(self, value: bool) -> None:
"""
Set the live mode status.
Args:
value (bool): True to enable live mode, False to disable.
"""
if value == self._live_mode:
return
self._live_mode = value
if value:
self._start_live_mode()
else:
self._stop_live_mode()
def _start_live_mode(self) -> None:
"""Start live mode."""
if self._live_mode_event is not None: # Kill task if it exists
self._live_mode_event.set()
self._live_mode_event = None
if self._task_status is not None:
self.task_handler.kill_task(task_status=self._task_status)
self._task_status = None
self._live_mode_event = threading.Event()
self._task_status = self.task_handler.submit_task(task=self.emit_to_bec)
def _stop_live_mode(self) -> None:
"""Stop live mode."""
if self._live_mode_event is not None:
self._live_mode_event.set()
self._live_mode_event = None
def emit_to_bec(self):
"""Emit the image data to BEC. If _live_mode_event is set, stop the task."""
while not self._live_mode_event.wait(1 / self._update_frequency):
value = self.image1.array_data.get()
if value is None:
continue
width = self.image1.array_size.width.get()
height = self.image1.array_size.height.get()
# Geometry correction for the image
data = np.reshape(value, (height, width))
self.preview.put(data)
########################################
# Beamline Specific Implementations #
########################################
def on_init(self) -> None:
"""
Called when the device is initialized.
No signals are connected at this point. If you like to
set default values on signals, please use on_connected instead.
"""
def on_connected(self) -> None:
"""
Called after the device is connected and its signals are connected.
Default values for signals should be set here.
"""
self.live_mode = True
def on_stage(self) -> DeviceStatus | StatusBase | None:
"""
Called while staging the device.
Information about the upcoming scan can be accessed from the scan_info (self.scan_info.msg) object.
"""
def on_unstage(self) -> DeviceStatus | StatusBase | None:
"""Called while unstaging the device."""
def on_pre_scan(self) -> DeviceStatus | StatusBase | None:
"""Called right before the scan starts on all devices automatically."""
def on_trigger(self) -> DeviceStatus | StatusBase | None:
"""Called when the device is triggered."""
def on_complete(self) -> DeviceStatus | StatusBase | None:
"""Called to inquire if a device has completed a scans."""
def on_kickoff(self) -> DeviceStatus | StatusBase | None:
"""Called to kickoff a device for a fly scan. Has to be called explicitly."""
def on_stop(self) -> None:
"""Called when the device is stopped."""

View File

@@ -1,41 +1,39 @@
"""Prosilica camera class for integration of beam_monitor 1/2 cameras."""
from __future__ import annotations
import time
from typing import TYPE_CHECKING
import numpy as np
from ophyd import ADBase
from ophyd import ADComponent as ADCpt
from ophyd import Component as Cpt
from ophyd import Device
from ophyd_devices import PreviewSignal
from ophyd_devices.devices.areadetector.cam import ProsilicaDetectorCam
from ophyd_devices.devices.areadetector.plugins import ImagePlugin_V35
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from debye_bec.devices.cameras.debye_base_cam import DebyeBaseCamera
if TYPE_CHECKING: # pragma: no cover
from bec_lib.devicemanager import ScanInfo
class ProsilicaCamBase(ADBase):
"""Base class for Prosilica cameras."""
cam1 = ADCpt(ProsilicaDetectorCam, "cam1:")
image1 = ADCpt(ImagePlugin_V35, "image1:")
class ProsilicaCam(PSIDeviceBase, ProsilicaCamBase):
class ProsilicaCam(DebyeBaseCamera, ProsilicaCamBase):
"""
Prosilica camera class, for integration of beam_monitor 1/2 cameras.
Prefixes are: X01DA-OP-GIGE02: and X01DA-OP-GIGE01:
"""
def __init__(self, *, name: str, prefix: str = "", scan_info: ScanInfo | None = None, **kwargs):
super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs)
self.last_emit = time.time()
self.update_frequency = 5 # Hz
def emit_to_bec(self, *args, obj=None, old_value=None, value=None, **kwargs):
if (time.time() - self.last_emit) < (1 / self.update_frequency):
return # Check logic
width = self.image1.array_size.width.get()
height = self.image1.array_size.height.get()
data = np.rot90(np.reshape(value, (height, width)), k=-1, axes=(0, 1))
self._run_subs(sub_type=self.SUB_DEVICE_MONITOR_2D, value=data)
self.last_emit = time.time()
def on_connected(self):
self.image1.array_data.subscribe(self.emit_to_bec, run=False)
preview = Cpt(
PreviewSignal,
name="preview",
ndim=2,
num_rotation_90=3,
doc="Preview signal for the camera.",
)

View File

@@ -3,8 +3,9 @@
### debye_bec
| Device | Documentation | Module |
| :----- | :------------- | :------ |
| BaslerCam | | [debye_bec.devices.cameras.basler_cam](https://gitlab.psi.ch/bec/debye_bec/-/blob/main/debye_bec/devices/cameras/basler_cam.py) |
| BaslerCamBase | | [debye_bec.devices.cameras.basler_cam](https://gitlab.psi.ch/bec/debye_bec/-/blob/main/debye_bec/devices/cameras/basler_cam.py) |
| BaslerCam | Basler camera class at Debye. IOC prefix: X01DA-ES-XRAYEYE: | [debye_bec.devices.cameras.basler_cam](https://gitlab.psi.ch/bec/debye_bec/-/blob/main/debye_bec/devices/cameras/basler_cam.py) |
| BaslerCamBase | BaslerCam Base class. | [debye_bec.devices.cameras.basler_cam](https://gitlab.psi.ch/bec/debye_bec/-/blob/main/debye_bec/devices/cameras/basler_cam.py) |
| DebyeBaseCamera | Base class for Debye cameras. | [debye_bec.devices.cameras.debye_base_cam](https://gitlab.psi.ch/bec/debye_bec/-/blob/main/debye_bec/devices/cameras/debye_base_cam.py) |
| ES0Filter | Class for the ES0 filter station X01DA-ES0-FI: | [debye_bec.devices.es0filter](https://gitlab.psi.ch/bec/debye_bec/-/blob/main/debye_bec/devices/es0filter.py) |
| GasMixSetup | Class for the ES2 Pilatus Curtain | [debye_bec.devices.pilatus_curtain](https://gitlab.psi.ch/bec/debye_bec/-/blob/main/debye_bec/devices/pilatus_curtain.py) |
| GasMixSetupControl | GasMixSetup Control for Inonization Chamber 0 | [debye_bec.devices.ionization_chambers.ionization_chamber](https://gitlab.psi.ch/bec/debye_bec/-/blob/main/debye_bec/devices/ionization_chambers/ionization_chamber.py) |
@@ -13,16 +14,17 @@
| IonizationChamber1 | Ionization Chamber 1, prefix should be 'X01DA-'. | [debye_bec.devices.ionization_chambers.ionization_chamber](https://gitlab.psi.ch/bec/debye_bec/-/blob/main/debye_bec/devices/ionization_chambers/ionization_chamber.py) |
| IonizationChamber2 | Ionization Chamber 2, prefix should be 'X01DA-'. | [debye_bec.devices.ionization_chambers.ionization_chamber](https://gitlab.psi.ch/bec/debye_bec/-/blob/main/debye_bec/devices/ionization_chambers/ionization_chamber.py) |
| Mo1Bragg | Mo1 Bragg motor for the Debye beamline.<br><br> The prefix to connect to the soft IOC is X01DA-OP-MO1:BRAGG:<br> | [debye_bec.devices.mo1_bragg.mo1_bragg](https://gitlab.psi.ch/bec/debye_bec/-/blob/main/debye_bec/devices/mo1_bragg/mo1_bragg.py) |
| Mo1BraggAngle | Positioner implementation with readback angle of the MO1 Bragg positioner. | [debye_bec.devices.mo1_bragg.mo1_bragg_angle](https://gitlab.psi.ch/bec/debye_bec/-/blob/main/debye_bec/devices/mo1_bragg/mo1_bragg_angle.py) |
| Mo1BraggCalculator | Mo1 Bragg PVs to convert angle to energy or vice-versa. | [debye_bec.devices.mo1_bragg.mo1_bragg_devices](https://gitlab.psi.ch/bec/debye_bec/-/blob/main/debye_bec/devices/mo1_bragg/mo1_bragg_devices.py) |
| Mo1BraggCrystal | Mo1 Bragg PVs to set the crystal parameters | [debye_bec.devices.mo1_bragg.mo1_bragg_devices](https://gitlab.psi.ch/bec/debye_bec/-/blob/main/debye_bec/devices/mo1_bragg/mo1_bragg_devices.py) |
| Mo1BraggEncoder | Mo1 Bragg PVs to communicate with the encoder | [debye_bec.devices.mo1_bragg.mo1_bragg_devices](https://gitlab.psi.ch/bec/debye_bec/-/blob/main/debye_bec/devices/mo1_bragg/mo1_bragg_devices.py) |
| Mo1BraggPositioner | <br> Positioner implementation of the MO1 Bragg positioner.<br><br> The prefix to connect to the soft IOC is X01DA-OP-MO1:BRAGG:<br> This soft IOC connects to the NI motor and its control loop.<br> | [debye_bec.devices.mo1_bragg.mo1_bragg_devices](https://gitlab.psi.ch/bec/debye_bec/-/blob/main/debye_bec/devices/mo1_bragg/mo1_bragg_devices.py) |
| Mo1BraggPositioner | <br> Positioner implementation with readback energy of the MO1 Bragg positioner.<br><br> The prefix to connect to the soft IOC is X01DA-OP-MO1:BRAGG:<br> This soft IOC connects to the NI motor and its control loop.<br> | [debye_bec.devices.mo1_bragg.mo1_bragg_devices](https://gitlab.psi.ch/bec/debye_bec/-/blob/main/debye_bec/devices/mo1_bragg/mo1_bragg_devices.py) |
| Mo1BraggScanControl | Mo1 Bragg PVs to control the scan after setting the parameters. | [debye_bec.devices.mo1_bragg.mo1_bragg_devices](https://gitlab.psi.ch/bec/debye_bec/-/blob/main/debye_bec/devices/mo1_bragg/mo1_bragg_devices.py) |
| Mo1BraggScanSettings | Mo1 Bragg PVs to set the scan setttings | [debye_bec.devices.mo1_bragg.mo1_bragg_devices](https://gitlab.psi.ch/bec/debye_bec/-/blob/main/debye_bec/devices/mo1_bragg/mo1_bragg_devices.py) |
| Mo1BraggStatus | Mo1 Bragg PVs for status monitoring | [debye_bec.devices.mo1_bragg.mo1_bragg_devices](https://gitlab.psi.ch/bec/debye_bec/-/blob/main/debye_bec/devices/mo1_bragg/mo1_bragg_devices.py) |
| Mo1TriggerSettings | Mo1 Trigger settings | [debye_bec.devices.mo1_bragg.mo1_bragg_devices](https://gitlab.psi.ch/bec/debye_bec/-/blob/main/debye_bec/devices/mo1_bragg/mo1_bragg_devices.py) |
| Nidaq | NIDAQ ophyd wrapper around the NIDAQ backend currently running at x01da-cons-05<br><br> Args:<br> prefix (str) : Prefix to the NIDAQ soft ioc, currently X01DA-PC-SCANSERVER:<br> name (str) : Name of the device<br> scan_info (ScanInfo) : ScanInfo object passed by BEC's devicemanager.<br> | [debye_bec.devices.nidaq.nidaq](https://gitlab.psi.ch/bec/debye_bec/-/blob/main/debye_bec/devices/nidaq/nidaq.py) |
| NidaqControl | Nidaq control class with all PVs | [debye_bec.devices.nidaq.nidaq](https://gitlab.psi.ch/bec/debye_bec/-/blob/main/debye_bec/devices/nidaq/nidaq.py) |
| ProsilicaCam | | [debye_bec.devices.cameras.prosilica_cam](https://gitlab.psi.ch/bec/debye_bec/-/blob/main/debye_bec/devices/cameras/prosilica_cam.py) |
| ProsilicaCamBase | | [debye_bec.devices.cameras.prosilica_cam](https://gitlab.psi.ch/bec/debye_bec/-/blob/main/debye_bec/devices/cameras/prosilica_cam.py) |
| ProsilicaCam | <br> Prosilica camera class, for integration of beam_monitor 1/2 cameras.<br> Prefixes are: X01DA-OP-GIGE02: and X01DA-OP-GIGE01:<br> | [debye_bec.devices.cameras.prosilica_cam](https://gitlab.psi.ch/bec/debye_bec/-/blob/main/debye_bec/devices/cameras/prosilica_cam.py) |
| ProsilicaCamBase | Base class for Prosilica cameras. | [debye_bec.devices.cameras.prosilica_cam](https://gitlab.psi.ch/bec/debye_bec/-/blob/main/debye_bec/devices/cameras/prosilica_cam.py) |
| Reffoilchanger | Class for the ES2 Reference Foil Changer | [debye_bec.devices.reffoilchanger](https://gitlab.psi.ch/bec/debye_bec/-/blob/main/debye_bec/devices/reffoilchanger.py) |

View File

@@ -15,7 +15,8 @@ from bec_lib.devicemanager import ScanInfo
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd import DeviceStatus, Signal, StatusBase
from ophyd.status import SubscriptionStatus
from ophyd.status import SubscriptionStatus, WaitTimeoutError
from ophyd_devices import CompareStatus, ProgressSignal, TransitionStatus
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from ophyd_devices.utils.errors import DeviceStopError
from pydantic import BaseModel, Field
@@ -81,7 +82,9 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
The prefix to connect to the soft IOC is X01DA-OP-MO1:BRAGG:
"""
USER_ACCESS = ["set_advanced_xas_settings"]
progress_signal = Cpt(ProgressSignal, name="progress_signal")
USER_ACCESS = ["set_advanced_xas_settings", "set_xtal"]
def __init__(self, name: str, prefix: str = "", scan_info: ScanInfo | None = None, **kwargs): # type: ignore
"""
@@ -93,7 +96,7 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
"""
super().__init__(name=name, scan_info=scan_info, prefix=prefix, **kwargs)
self.scan_parameter = ScanParameter()
self.timeout_for_pvwait = 2.5
self.timeout_for_pvwait = 7.5
########################################
# Beamline Specific Implementations #
@@ -120,7 +123,11 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
Information about the upcoming scan can be accessed from the scan_info (self.scan_info.msg) object.
"""
self._check_scan_msg(ScanControlLoadMessage.PENDING)
if self.scan_control.scan_msg.get() != ScanControlLoadMessage.PENDING:
status = CompareStatus(self.scan_control.scan_msg, ScanControlLoadMessage.PENDING)
self.cancel_on_stop(status)
self.scan_control.scan_val_reset.put(1)
status.wait(timeout=self.timeout_for_pvwait)
scan_name = self.scan_info.msg.scan_name
self._update_scan_parameter()
@@ -198,14 +205,16 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
)
else:
return
# Setting scan duration seems to lag behind slightly in the backend, include small sleep
logger.info(f"Sleeping for one second")
time.sleep(1)
logger.info(f"Device {self.name}, done sleeping")
# Load the scan parameters to the controller
status = CompareStatus(self.scan_control.scan_msg, ScanControlLoadMessage.SUCCESS)
self.cancel_on_stop(status)
self.scan_control.scan_load.put(1)
# Wait for params to be checked from controller
self.wait_for_signal(
self.scan_control.scan_msg,
ScanControlLoadMessage.SUCCESS,
timeout=self.timeout_for_pvwait,
)
# Wait for params to be checked from controller
status.wait(self.timeout_for_pvwait)
return None
def on_unstage(self) -> DeviceStatus | StatusBase | None:
@@ -213,32 +222,28 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
if self.stopped is True:
logger.warning(f"Resetting stopped in unstage for device {self.name}.")
self._stopped = False
current_state = self.scan_control.scan_msg.get()
# Case 1, message is already ScanControlLoadMessage.PENDING
if current_state == ScanControlLoadMessage.PENDING:
return None
# Case 2, probably called after scan, backend should resolve on its own. Timeout to wait
if current_state in [ScanControlLoadMessage.STARTED, ScanControlLoadMessage.SUCCESS]:
if self.scan_control.scan_msg.get() in [
ScanControlLoadMessage.STARTED,
ScanControlLoadMessage.SUCCESS,
]:
status = CompareStatus(self.scan_control.scan_msg, ScanControlLoadMessage.PENDING)
self.cancel_on_stop(status)
try:
self.wait_for_signal(
self.scan_control.scan_msg,
ScanControlLoadMessage.PENDING,
timeout=self.timeout_for_pvwait,
)
return
except TimeoutError:
status.wait(2)
return None
except WaitTimeoutError:
logger.warning(
f"Timeout in on_unstage of {self.name} after {self.timeout_for_pvwait}s, current scan_control_message : {self.scan_control.scan_msg.get()}"
)
def callback(*, old_value, value, **kwargs):
if value == ScanControlLoadMessage.PENDING:
return True
return False
status = SubscriptionStatus(self.scan_control.scan_msg, callback=callback)
self.scan_control.scan_val_reset.put(1)
status.wait(timeout=self.timeout_for_pvwait)
status = CompareStatus(self.scan_control.scan_msg, ScanControlLoadMessage.PENDING)
self.cancel_on_stop(status)
self.scan_control.scan_val_reset.put(1)
status.wait(timeout=self.timeout_for_pvwait)
else:
status = CompareStatus(self.scan_control.scan_msg, ScanControlLoadMessage.PENDING)
self.cancel_on_stop(status)
self.scan_control.scan_val_reset.put(1)
status.wait(timeout=self.timeout_for_pvwait)
return None
def on_pre_scan(self) -> DeviceStatus | StatusBase | None:
@@ -249,20 +254,8 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
def on_complete(self) -> DeviceStatus | StatusBase | None:
"""Called to inquire if a device has completed a scans."""
def wait_for_complete():
"""Wait for the scan to complete. No timeout is set."""
start_time = time.time()
while True:
if self.stopped is True:
raise DeviceStopError(
f"Device {self.name} was stopped while waiting for scan to complete"
)
if self.scan_control.scan_done.get() == 1:
return
time.sleep(0.1)
status = self.task_handler.submit_task(wait_for_complete)
status = CompareStatus(self.scan_control.scan_done, 1)
self.cancel_on_stop(status)
return status
def on_kickoff(self) -> DeviceStatus | StatusBase | None:
@@ -274,13 +267,13 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
if scan_duration < 0.1
else self.scan_control.scan_start_timer.put
)
def callback(*, old_value, value, **kwargs):
if old_value == ScanControlScanStatus.READY and value == ScanControlScanStatus.RUNNING:
return True
return False
status = SubscriptionStatus(self.scan_control.scan_status, callback=callback)
status = TransitionStatus(
self.scan_control.scan_status,
transitions=[ScanControlScanStatus.READY, ScanControlScanStatus.RUNNING],
strict=True,
raise_states=[ScanControlScanStatus.PARAMETER_WRONG],
)
self.cancel_on_stop(status)
start_func(1)
return status
@@ -289,9 +282,6 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
self.stopped = True # Needs to be set to stop motion
######### Utility Methods #########
# FIXME this should become the ProgressSignal
# pylint: disable=unused-argument
def _progress_update(self, value, **kwargs) -> None:
"""Callback method to update the scan progress, runs a callback
to SUB_PROGRESS subscribers, i.e. BEC.
@@ -300,12 +290,7 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
value (int) : current progress value
"""
max_value = 100
self._run_subs(
sub_type=self.SUB_PROGRESS,
value=value,
max_value=max_value,
done=bool(max_value == value),
)
self.progress_signal.put(value=value, max_value=max_value, done=bool(max_value == value))
def set_xas_settings(self, low: float, high: float, scan_time: float) -> None:
"""Set XAS parameters for upcoming scan.
@@ -315,30 +300,20 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
high (float): High energy/angle value of the scan
scan_time (float): Time for a half oscillation
"""
move_type = self.move_type.get()
if move_type == MoveType.ENERGY:
self.scan_settings.s_scan_energy_lo.put(low)
self.scan_settings.s_scan_energy_hi.put(high)
else:
self.scan_settings.s_scan_angle_lo.put(low)
self.scan_settings.s_scan_angle_hi.put(high)
self.scan_settings.s_scan_scantime.put(scan_time)
def wait_for_signal(self, signal: Cpt, value: Any, timeout: float | None = None) -> None:
"""Wait for a signal to reach a certain value."""
if timeout is None:
timeout = self.timeout_for_pvwait
start_time = time.time()
while time.time() - start_time < timeout:
if signal.get() == value:
return None
if self.stopped is True: # Should this check be optional or configurable?!
raise DeviceStopError(f"Device {self.name} was stopped while waiting for signal")
time.sleep(0.1)
# If we end up here, the status did not resolve
raise TimeoutError(
f"Device {self.name} run into timeout after {timeout}s for signal {signal.name} with value {signal.get()}, expected {value}"
)
status_list = []
status_list.append(self.scan_settings.s_scan_energy_lo.set(low))
self.cancel_on_stop(status_list[-1])
status_list.append(self.scan_settings.s_scan_energy_hi.set(high))
self.cancel_on_stop(status_list[-1])
status_list.append(self.scan_settings.s_scan_scantime.set(scan_time))
self.cancel_on_stop(status_list[-1])
for s in status_list:
s.wait(timeout=self.timeout_for_pvwait)
@typechecked
def convert_angle_energy(
@@ -355,15 +330,19 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
"""
self.calculator.calc_reset.put(0)
self.calculator.calc_reset.put(1)
self.wait_for_signal(self.calculator.calc_done, 0)
status = CompareStatus(self.calculator.calc_done, 0)
self.cancel_on_stop(status)
status.wait(self.timeout_for_pvwait)
if mode == "AngleToEnergy":
self.calculator.calc_angle.put(inp)
elif mode == "EnergyToAngle":
self.calculator.calc_energy.put(inp)
self.wait_for_signal(self.calculator.calc_done, 1)
time.sleep(0.25) # Needed due to update frequency of softIOC
status = CompareStatus(self.calculator.calc_done, 1)
self.cancel_on_stop(status)
status.wait(self.timeout_for_pvwait)
time.sleep(0.15)
if mode == "AngleToEnergy":
return self.calculator.calc_energy.get()
elif mode == "EnergyToAngle":
@@ -381,18 +360,11 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
p_kink (float): Position of kink in %
e_kink (float): Energy of kink in eV
"""
# TODO Add fallback solution for automatic testing, otherwise test will fail
# because no monochromator will calculate the angle
# Unsure how to implement this
move_type = self.move_type.get()
if move_type == MoveType.ENERGY:
e_kink_deg = self.convert_angle_energy(mode="EnergyToAngle", inp=e_kink)
# Angle and Energy are inverse proportional!
high_deg = self.convert_angle_energy(mode="EnergyToAngle", inp=low)
low_deg = self.convert_angle_energy(mode="EnergyToAngle", inp=high)
else:
raise Mo1BraggError("MoveType Angle not implemented for advanced scans, use Energy")
e_kink_deg = self.convert_angle_energy(mode="EnergyToAngle", inp=e_kink)
# Angle and Energy are inverse proportional!
high_deg = self.convert_angle_energy(mode="EnergyToAngle", inp=low)
low_deg = self.convert_angle_energy(mode="EnergyToAngle", inp=high)
p_kink = 100 - p_kink
pos, vel, dt = compute_spline(
low_deg=low_deg,
@@ -402,9 +374,19 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
scan_time=scan_time,
)
self.scan_settings.a_scan_pos.set(pos)
self.scan_settings.a_scan_vel.set(vel)
self.scan_settings.a_scan_time.set(dt)
status_list = []
status_list.append(self.scan_settings.a_scan_pos.set(pos))
self.cancel_on_stop(status_list[-1])
status_list.append(self.scan_settings.a_scan_vel.set(vel))
self.cancel_on_stop(status_list[-1])
status_list.append(self.scan_settings.a_scan_time.set(dt))
self.cancel_on_stop(status_list[-1])
for s in status_list:
s.wait(timeout=self.timeout_for_pvwait)
def set_trig_settings(
self,
@@ -427,12 +409,30 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
cycle_low (int): Cycle for low energy/angle
cycle_high (int): Cycle for high energy/angle
"""
self.scan_settings.trig_ena_hi_enum.put(int(enable_high))
self.scan_settings.trig_ena_lo_enum.put(int(enable_low))
self.scan_settings.trig_time_hi.put(exp_time_high)
self.scan_settings.trig_time_lo.put(exp_time_low)
self.scan_settings.trig_every_n_hi.put(cycle_high)
self.scan_settings.trig_every_n_lo.put(cycle_low)
status_list = []
status_list.append(self.scan_settings.trig_ena_hi_enum.set(int(enable_high)))
self.cancel_on_stop(status_list[-1])
status_list.append(self.scan_settings.trig_ena_lo_enum.set(int(enable_low)))
self.cancel_on_stop(status_list[-1])
status_list.append(self.scan_settings.trig_time_hi.set(exp_time_high))
self.cancel_on_stop(status_list[-1])
status_list.append(self.scan_settings.trig_time_lo.set(exp_time_low))
self.cancel_on_stop(status_list[-1])
status_list.append(self.scan_settings.trig_every_n_hi.set(cycle_high))
self.cancel_on_stop(status_list[-1])
status_list.append(self.scan_settings.trig_every_n_lo.set(cycle_low))
self.cancel_on_stop(status_list[-1])
for s in status_list:
s.wait(timeout=self.timeout_for_pvwait)
def set_scan_control_settings(self, mode: ScanControlMode, scan_duration: float) -> None:
"""Set the scan control settings for the upcoming scan.
@@ -442,8 +442,18 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
scan_duration (float): Duration of the scan
"""
val = ScanControlMode(mode).value
self.scan_control.scan_mode_enum.put(val)
self.scan_control.scan_duration.put(scan_duration)
status_list = []
status_list.append(self.scan_control.scan_mode_enum.set(val))
self.cancel_on_stop(status_list[-1])
status_list.append(self.scan_control.scan_duration.set(scan_duration))
self.cancel_on_stop(status_list[-1])
for s in status_list:
s.wait(timeout=self.timeout_for_pvwait)
def _update_scan_parameter(self):
"""Get the scan_info parameters for the scan."""
@@ -453,39 +463,3 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
for key, value in self.scan_info.msg.request_inputs["kwargs"].items():
if hasattr(self.scan_parameter, key):
setattr(self.scan_parameter, key, value)
def _check_scan_msg(self, target_state: ScanControlLoadMessage) -> None:
"""Check if the scan message is gettting available
Args:
target_state (ScanControlLoadMessage): Target state to check for
Raises:
TimeoutError: If the scan message is not available after the timeout
"""
try:
self.wait_for_signal(self.scan_control.scan_msg, target_state, timeout=1)
except TimeoutError as exc:
logger.warning(
f"Resetting scan validation in stage for state: {ScanControlLoadMessage(self.scan_control.scan_msg.get())}, "
f"retry .get() on scan_control: {ScanControlLoadMessage(self.scan_control.scan_msg.get())} and sleeping 1s"
)
current_scan_msg = self.scan_control.scan_msg.get()
def callback(*, old_value, value, **kwargs):
if old_value == current_scan_msg and value == target_state:
return True
return False
status = SubscriptionStatus(self.scan_control.scan_msg, callback=callback)
self.scan_control.scan_val_reset.put(1)
status.wait(timeout=self.timeout_for_pvwait)
# try:
# self.wait_for_signal(self.scan_control.scan_msg, target_state, timeout=4)
# except TimeoutError as exc:
# raise TimeoutError(
# f"Timeout after {self.timeout_for_pvwait} while waiting for scan status,"
# f" current state: {ScanControlScanStatus(self.scan_control.scan_msg.get())}"
# ) from exc

View File

@@ -0,0 +1,20 @@
"""Positioner implementation with readback angle of the MO1 Bragg positioner."""
from ophyd import Component as Cpt
from ophyd import EpicsSignalRO, EpicsSignalWithRBV
from debye_bec.devices.mo1_bragg.mo1_bragg_devices import Mo1BraggPositioner
class Mo1BraggAngle(Mo1BraggPositioner):
"""Positioner implementation with readback angle of the MO1 Bragg positioner."""
readback = Cpt(EpicsSignalRO, suffix="feedback_pos_angle_RBV", kind="normal", auto_monitor=True)
setpoint = Cpt(EpicsSignalWithRBV, suffix="set_abs_pos_angle", kind="normal", auto_monitor=True)
low_lim = Cpt(EpicsSignalRO, suffix="lo_lim_pos_angle_RBV", kind="config", auto_monitor=True)
high_lim = Cpt(EpicsSignalRO, suffix="hi_lim_pos_angle_RBV", kind="config", auto_monitor=True)
@property
def egu(self) -> str:
"""Return the engineering unit of the positioner."""
return "deg"

View File

@@ -82,10 +82,20 @@ class Mo1BraggCrystal(Device):
d_spacing_si111 = Cpt(EpicsSignalWithRBV, suffix="d_spacing_si111", kind="config")
d_spacing_si311 = Cpt(EpicsSignalWithRBV, suffix="d_spacing_si311", kind="config")
set_offset = Cpt(EpicsSignal, suffix="set_offset", kind="config", put_complete=True)
current_d_spacing = Cpt(
EpicsSignalRO, suffix="current_d_spacing_RBV", kind="normal", auto_monitor=True
)
current_offset = Cpt(
EpicsSignalRO, suffix="current_offset_RBV", kind="normal", auto_monitor=True
)
current_xtal = Cpt(
EpicsSignalRO, suffix="current_xtal_ENUM_RBV", kind="normal", auto_monitor=True
)
current_xtal_string = Cpt(
EpicsSignalRO, suffix="current_xtal_ENUM_RBV", kind="normal", auto_monitor=True, string=True
)
class Mo1BraggScanSettings(Device):
"""Mo1 Bragg PVs to set the scan setttings"""
@@ -189,13 +199,13 @@ class Mo1BraggScanControl(Device):
class Mo1BraggPositioner(Device, PositionerBase):
"""
Positioner implementation of the MO1 Bragg positioner.
Positioner implementation with readback energy of the MO1 Bragg positioner.
The prefix to connect to the soft IOC is X01DA-OP-MO1:BRAGG:
This soft IOC connects to the NI motor and its control loop.
"""
USER_ACCESS = ["set_advanced_xas_settings"]
USER_ACCESS = ["set_xtal"]
####### Sub-components ########
# Namespace is cleaner and easier to maintain
@@ -207,10 +217,6 @@ class Mo1BraggPositioner(Device, PositionerBase):
scan_control = Cpt(Mo1BraggScanControl, "")
status = Cpt(Mo1BraggStatus, "")
############# switch between energy and angle #############
# TODO should be removed/replaced once decision about pseudo motor is made
move_type = Cpt(MoveTypeSignal, value=MoveType.ENERGY, kind="config")
############# Energy PVs #############
readback = Cpt(
@@ -226,22 +232,6 @@ class Mo1BraggPositioner(Device, PositionerBase):
high_lim = Cpt(EpicsSignalRO, suffix="hi_lim_pos_energy_RBV", kind="config", auto_monitor=True)
velocity = Cpt(EpicsSignalWithRBV, suffix="move_velocity", kind="config", auto_monitor=True)
########### Angle PVs #############
# TODO Pseudo motor for angle?
feedback_pos_angle = Cpt(
EpicsSignalRO, suffix="feedback_pos_angle_RBV", kind="normal", auto_monitor=True
)
setpoint_abs_angle = Cpt(
EpicsSignalWithRBV, suffix="set_abs_pos_angle", kind="normal", auto_monitor=True
)
low_limit_angle = Cpt(
EpicsSignalRO, suffix="lo_lim_pos_angle_RBV", kind="config", auto_monitor=True
)
high_limit_angle = Cpt(
EpicsSignalRO, suffix="hi_lim_pos_angle_RBV", kind="config", auto_monitor=True
)
########## Move Command PVs ##########
move_abs = Cpt(EpicsSignal, suffix="move_abs", kind="config", put_complete=True)
@@ -271,9 +261,7 @@ class Mo1BraggPositioner(Device, PositionerBase):
success (bool) : Flag to indicate if the motion was successful
"""
self.move_stop.put(1)
if self._move_thread is not None:
self._move_thread.join()
self._move_thread = None
self._stopped = True
super().stop(success=success)
def stop_scan(self) -> None:
@@ -290,9 +278,7 @@ class Mo1BraggPositioner(Device, PositionerBase):
@property
def limits(self) -> tuple:
"""Return limits of the Bragg positioner"""
if self.move_type.get() == MoveType.ENERGY:
return (self.low_lim.get(), self.high_lim.get())
return (self.low_limit_angle.get(), self.high_limit_angle.get())
return (self.low_lim.get(), self.high_lim.get())
@property
def low_limit(self) -> float:
@@ -307,16 +293,12 @@ class Mo1BraggPositioner(Device, PositionerBase):
@property
def egu(self) -> str:
"""Return the engineering units of the positioner"""
if self.move_type.get() == MoveType.ENERGY:
return "eV"
return "deg"
return "eV"
@property
def position(self) -> float:
"""Return the current position of Mo1Bragg, considering the move type"""
move_type = self.move_type.get()
move_cpt = self.readback if move_type == MoveType.ENERGY else self.feedback_pos_angle
return move_cpt.get()
return self.readback.get()
# pylint: disable=arguments-differ
def check_value(self, value: float) -> None:
@@ -332,7 +314,7 @@ class Mo1BraggPositioner(Device, PositionerBase):
raise LimitError(f"position={value} not within limits {self.limits}")
def _move_and_finish(
self, target_pos: float, move_cpt: Cpt, status: DeviceStatus, update_frequency: float = 0.1
self, target_pos: float, status: DeviceStatus, update_frequency: float = 0.1
) -> None:
"""
Method to be called in the move thread to move the Bragg positioner
@@ -349,12 +331,14 @@ class Mo1BraggPositioner(Device, PositionerBase):
update_frequency (float): Optional, frequency to update the current position of
the motion, defaults to 0.1s
"""
motor_name = None
try:
# Set the target position on IOC
move_cpt.put(target_pos)
self.setpoint.put(target_pos)
self.move_abs.put(1)
# Currently sleep is needed due to delay in updates on PVs, maybe time can be reduced
time.sleep(0.5)
motor_name = self.name
while self.motor_is_moving.get() == 0:
if self.stopped:
raise Mo1BraggStoppedError(f"Device {self.name} was stopped")
@@ -364,10 +348,12 @@ class Mo1BraggPositioner(Device, PositionerBase):
# pylint: disable=broad-except
except Exception as exc:
content = traceback.format_exc()
logger.error(f"Error in move thread of device {self.name}: {content}")
logger.error(
f"Error in move thread of device {motor_name if motor_name else ''}: {content}"
)
status.set_exception(exc=exc)
def move(self, value: float, move_type: str | MoveType = None, **kwargs) -> DeviceStatus:
def move(self, value: float, **kwargs) -> DeviceStatus:
"""
Move the Bragg positioner to the specified value, allows to
switch between move types angle and energy.
@@ -381,16 +367,12 @@ class Mo1BraggPositioner(Device, PositionerBase):
DeviceStatus : status object to track the motion
"""
self._stopped = False
if move_type is not None:
self.move_type.put(move_type)
move_type = self.move_type.get()
move_cpt = self.setpoint if move_type == MoveType.ENERGY else self.setpoint_abs_angle
self.check_value(value)
status = DeviceStatus(device=self)
self._move_thread = threading.Thread(
target=self._move_and_finish, args=(value, move_cpt, status, 0.1)
target=self._move_and_finish, args=(value, status, 0.1)
)
self._move_thread.start()
return status

View File

@@ -1,15 +1,18 @@
from __future__ import annotations
import time
from typing import TYPE_CHECKING, Literal, cast
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd import Device, DeviceStatus, EpicsSignal, EpicsSignalRO, Kind, StatusBase
from ophyd.status import SubscriptionStatus, WaitTimeoutError
from ophyd_devices import CompareStatus, ProgressSignal, TransitionStatus
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from ophyd_devices.sim.sim_signals import SetableSignal
from debye_bec.devices.nidaq.nidaq_enums import (
EncoderTypes,
EncoderFactors,
NIDAQCompression,
NidaqState,
ReadoutRange,
@@ -189,6 +192,14 @@ class NidaqControl(Device):
auto_monitor=True,
)
energy_epics = Cpt(
EpicsSignalRO,
suffix="NIDAQ-ENERGY",
kind=Kind.normal,
doc="EPICS Energy reading",
auto_monitor=True,
)
### Readback for BEC emitter ###
ai0_mean = Cpt(
@@ -298,6 +309,8 @@ class NidaqControl(Device):
di4_max = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream digital input 4, MAX")
enc = Cpt(SetableSignal, value=0, kind=Kind.normal)
energy = Cpt(SetableSignal, value=0, kind=Kind.normal)
rle = Cpt(SetableSignal, value=0, kind=Kind.normal)
### Control PVs ###
@@ -311,11 +324,14 @@ class NidaqControl(Device):
sampling_rate = Cpt(EpicsSignal, suffix="NIDAQ-SamplingRateRequested", kind=Kind.config)
scan_duration = Cpt(EpicsSignal, suffix="NIDAQ-SamplingDuration", kind=Kind.config)
readout_range = Cpt(EpicsSignal, suffix="NIDAQ-ReadoutRange", kind=Kind.config)
encoder_type = Cpt(EpicsSignal, suffix="NIDAQ-EncoderType", kind=Kind.config)
encoder_factor = Cpt(EpicsSignal, suffix="NIDAQ-EncoderFactor", kind=Kind.config)
stop_call = Cpt(EpicsSignal, suffix="NIDAQ-Stop", kind=Kind.config)
power = Cpt(EpicsSignal, suffix="NIDAQ-Power", kind=Kind.config)
heartbeat = Cpt(EpicsSignal, suffix="NIDAQ-Heartbeat", kind=Kind.config, auto_monitor=True)
time_left = Cpt(EpicsSignalRO, suffix="NIDAQ-TimeLeft", kind=Kind.config, auto_monitor=True)
ai_chans = Cpt(EpicsSignal, suffix="NIDAQ-AIChans", kind=Kind.config)
ci_chans = Cpt(EpicsSignal, suffix="NIDAQ-CIChans6614", kind=Kind.config)
ci_chans = Cpt(EpicsSignal, suffix="NIDAQ-CIChans", kind=Kind.config)
di_chans = Cpt(EpicsSignal, suffix="NIDAQ-DIChans", kind=Kind.config)
@@ -328,10 +344,13 @@ class Nidaq(PSIDeviceBase, NidaqControl):
scan_info (ScanInfo) : ScanInfo object passed by BEC's devicemanager.
"""
progress_signal = Cpt(ProgressSignal, name="progress_signal")
USER_ACCESS = ["set_config"]
def __init__(self, prefix: str = "", *, name: str, scan_info: ScanInfo = None, **kwargs):
super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs)
self.scan_info: ScanInfo
self.timeout_wait_for_signal = 5 # put 5s firsts
self._timeout_wait_for_pv = 3 # 3s timeout for pv calls
self.valid_scan_names = [
@@ -339,6 +358,7 @@ class Nidaq(PSIDeviceBase, NidaqControl):
"xas_simple_scan_with_xrd",
"xas_advanced_scan",
"xas_advanced_scan_with_xrd",
"nidaq_continuous_scan",
]
########################################
@@ -443,12 +463,20 @@ class Nidaq(PSIDeviceBase, NidaqControl):
elif readout_range == 10:
self.readout_range.put(ReadoutRange.TEN_V)
if encoder_type in "X_1":
self.encoder_type.put(EncoderTypes.X_1)
elif encoder_type in "X_2":
self.encoder_type.put(EncoderTypes.X_2)
elif encoder_type in "X_4":
self.encoder_type.put(EncoderTypes.X_4)
if encoder_type in "1/16":
self.encoder_factor.put(EncoderFactors.X1_16)
elif encoder_type in "1/8":
self.encoder_factor.put(EncoderFactors.X1_8)
elif encoder_type in "1/4":
self.encoder_factor.put(EncoderFactors.X1_4)
elif encoder_type in "1/2":
self.encoder_factor.put(EncoderFactors.X1_2)
elif encoder_type in "1":
self.encoder_factor.put(EncoderFactors.X1)
elif encoder_type in "2":
self.encoder_factor.put(EncoderFactors.X2)
elif encoder_type in "4":
self.encoder_factor.put(EncoderFactors.X4)
if enable_compression is True:
self.enable_compression.put(NIDAQCompression.ON)
@@ -472,15 +500,23 @@ class Nidaq(PSIDeviceBase, NidaqControl):
Called after the device is connected and its signals are connected.
Default values for signals should be set here.
"""
if not self.wait_for_condition(
condition=lambda: self.state.get() == NidaqState.STANDBY,
timeout=self.timeout_wait_for_signal,
check_stopped=True,
):
raise NidaqError(
f"Device {self.name} has not been reached in state STANDBY, current state {NidaqState(self.state.get())}"
)
status = TransitionStatus(self.heartbeat, transitions=[0, 1], strict=False)
self.cancel_on_stop(status)
try:
status.wait(timeout=self.timeout_wait_for_signal) # Raises if timeout is reached
except WaitTimeoutError:
logger.warning(f"Device {self.name} was not alive, trying to put power on")
status = TransitionStatus(self.heartbeat, transitions=[0, 1], strict=False)
self.cancel_on_stop(status)
self.power.put(1)
status.wait(timeout=self.timeout_wait_for_signal)
status = CompareStatus(self.state, NidaqState.STANDBY)
self.cancel_on_stop(status)
status.wait(timeout=self.timeout_wait_for_signal)
self.scan_duration.set(0).wait(timeout=self._timeout_wait_for_pv)
self.time_left.subscribe(self._progress_update, run=False)
def on_stage(self) -> DeviceStatus | StatusBase | None:
"""
@@ -492,41 +528,52 @@ class Nidaq(PSIDeviceBase, NidaqControl):
if not self._check_if_scan_name_is_valid():
return None
if not self.wait_for_condition(
condition=lambda: self.state.get() == NidaqState.STANDBY,
timeout=self.timeout_wait_for_signal,
check_stopped=True,
):
raise NidaqError(
f"Device {self.name} has not been reached in state STANDBY, current state {NidaqState(self.state.get())}"
)
self.scan_type.set(ScanType.TRIGGERED).wait(timeout=self._timeout_wait_for_pv)
self.scan_duration.set(0).wait(timeout=self._timeout_wait_for_pv)
self.stage_call.set(1).wait(timeout=self._timeout_wait_for_pv)
if self.state.get() != NidaqState.STANDBY:
status = CompareStatus(self.state, NidaqState.STANDBY)
self.cancel_on_stop(status)
self.on_stop()
status.wait(timeout=self.timeout_wait_for_signal)
if not self.wait_for_condition(
condition=lambda: self.state.get() == NidaqState.STAGE,
timeout=self.timeout_wait_for_signal,
check_stopped=True,
):
raise NidaqError(
f"Device {self.name} has not been reached in state STAGE, current state {NidaqState(self.state.get())}"
# If scan is not part of the valid_scan_names,
if self.scan_info.msg.scan_name != "nidaq_continuous_scan":
self.scan_type.set(ScanType.TRIGGERED).wait(timeout=self._timeout_wait_for_pv)
self.scan_duration.set(0).wait(timeout=self._timeout_wait_for_pv)
self.enable_compression.set(1).wait(timeout=self._timeout_wait_for_pv)
else:
self.scan_type.set(ScanType.CONTINUOUS).wait(timeout=self._timeout_wait_for_pv)
self.scan_duration.set(self.scan_info.msg.scan_parameters["scan_duration"]).wait(
timeout=self._timeout_wait_for_pv
)
self.kickoff_call.set(1).wait(timeout=self._timeout_wait_for_pv)
self.enable_compression.set(self.scan_info.msg.scan_parameters["compression"]).wait(
timeout=self._timeout_wait_for_pv
)
# Stage call to IOC
status = CompareStatus(self.state, NidaqState.STAGE)
self.cancel_on_stop(status)
self.stage_call.set(1).wait(timeout=self._timeout_wait_for_pv)
status.wait(timeout=self.timeout_wait_for_signal)
if self.scan_info.msg.scan_name != "nidaq_continuous_scan":
status = self.on_kickoff()
self.cancel_on_stop(status)
status.wait(timeout=self._timeout_wait_for_pv)
logger.info(f"Device {self.name} was staged: {NidaqState(self.state.get())}")
def on_kickoff(self) -> DeviceStatus | StatusBase:
"""Kickoff the Nidaq"""
status = self.kickoff_call.set(1)
self.cancel_on_stop(status)
return status
def on_unstage(self) -> DeviceStatus | StatusBase | None:
"""Called while unstaging the device. Check that the Nidaq goes into Standby"""
def _get_state():
return self.state.get() == NidaqState.STANDBY
if not self.wait_for_condition(
condition=_get_state, timeout=self.timeout_wait_for_signal, check_stopped=False
):
raise NidaqError(
f"Device {self.name} has not been reached in state STANDBY, current state {NidaqState(self.state.get())}"
)
status = CompareStatus(self.state, NidaqState.STANDBY)
self.cancel_on_stop(status)
status.wait(timeout=self.timeout_wait_for_signal)
status = self.enable_compression.set(1)
self.cancel_on_stop(status)
status.wait(self._timeout_wait_for_pv)
logger.info(f"Device {self.name} was unstaged: {NidaqState(self.state.get())}")
def on_pre_scan(self) -> DeviceStatus | StatusBase | None:
@@ -540,15 +587,13 @@ class Nidaq(PSIDeviceBase, NidaqControl):
if not self._check_if_scan_name_is_valid():
return None
def _wait_for_state():
return self.state.get() == NidaqState.KICKOFF
if self.scan_info.msg.scan_name == "nidaq_continuous_scan":
logger.info(f"Device {self.name} ready to be kicked off for nidaq_continuous_scan")
return None
if not self.wait_for_condition(
_wait_for_state, timeout=self.timeout_wait_for_signal, check_stopped=True
):
raise NidaqError(
f"Device {self.name} failed to reach state KICKOFF during pre scan, current state {NidaqState(self.state.get())}"
)
status = CompareStatus(self.state, NidaqState.KICKOFF)
self.cancel_on_stop(status)
status.wait(timeout=self._timeout_wait_for_pv)
logger.info(
f"Device {self.name} ready to take data after pre_scan: {NidaqState(self.state.get())}"
)
@@ -565,18 +610,26 @@ class Nidaq(PSIDeviceBase, NidaqControl):
"""
if not self._check_if_scan_name_is_valid():
return None
self.on_stop()
# TODO check if this wait can be removed. We are waiting in unstage anyways which will always be called afterwards
# Wait for device to be stopped
status = self.wait_for_condition(
condition=lambda: self.state.get() == NidaqState.STANDBY,
check_stopped=True,
timeout=self.timeout_wait_for_signal,
)
status = CompareStatus(self.state, NidaqState.STANDBY)
self.cancel_on_stop(status)
if self.scan_info.msg.scan_name != "nidaq_continuous_scan":
self.on_stop()
return status
def on_kickoff(self) -> DeviceStatus | StatusBase | None:
"""Called to kickoff a device for a fly scan. Has to be called explicitly."""
def _progress_update(self, value, **kwargs) -> None:
"""Callback method to update the scan progress, runs a callback
to SUB_PROGRESS subscribers, i.e. BEC.
Args:
value (int) : current progress value
"""
scan_duration = self.scan_info.msg.scan_parameters.get("scan_duration", None)
if not isinstance(scan_duration, (int, float)):
return
value = scan_duration - value
max_value = scan_duration
self.progress_signal.put(value=value, max_value=max_value, done=bool(max_value == value))
def on_stop(self) -> None:
"""Called when the device is stopped."""

View File

@@ -48,9 +48,13 @@ class ReadoutRange(int, enum.Enum):
TEN_V = 3
class EncoderTypes(int, enum.Enum):
"""Encoder Types"""
class EncoderFactors(int, enum.Enum):
"""Encoder Factors"""
X_1 = 0
X_2 = 1
X_4 = 2
X1_16 = 0
X1_8 = 1
X1_4 = 2
X1_2 = 3
X1 = 4
X2 = 5
X4 = 6

View File

@@ -0,0 +1 @@
from .debye_nexus_structure import DebyeNexusStructure

View File

@@ -0,0 +1,125 @@
from bec_server.file_writer.default_writer import DefaultFormat
class DebyeNexusStructure(DefaultFormat):
"""Nexus Structure for Debye"""
def format(self) -> None:
"""Specify the file format for the file writer."""
entry = self.storage.create_group(name="entry")
entry.attrs["NX_class"] = "NXentry"
instrument = entry.create_group(name="instrument")
instrument.attrs["NX_class"] = "NXinstrument"
###################
## mo1_bragg specific information
###################
# Logic if device exist
if "mo1_bragg" in self.device_manager.devices:
monochromator = instrument.create_group(name="monochromator")
monochromator.attrs["NX_class"] = "NXmonochromator"
crystal = monochromator.create_group(name="crystal")
crystal.attrs["NX_class"] = "NXcrystal"
# Create a dataset
chemical_formular = crystal.create_dataset(name="chemical_formular", data="Si")
chemical_formular.attrs["NX_class"] = "NX_CHAR"
# Create a softlink
d_spacing = crystal.create_soft_link(
name="d_spacing",
target="/entry/collection/devices/mo1_bragg/mo1_bragg_crystal_current_d_spacing/value",
)
d_spacing.attrs["NX_class"] = "NX_FLOAT"
offset = crystal.create_soft_link(
name="offset",
target="/entry/collection/devices/mo1_bragg/mo1_bragg_crystal_current_offset/value",
)
offset.attrs["NX_class"] = "NX_FLOAT"
reflection = crystal.create_soft_link(
name="reflection",
target="/entry/collection/devices/mo1_bragg/mo1_bragg_crystal_current_xtal_string/value",
)
reflection.attrs["NX_class"] = "NX_CHAR"
##################
## cm mirror specific information
###################
collimating_mirror = instrument.create_group(name="collimating_mirror")
collimating_mirror.attrs["NX_class"] = "NXmirror"
cm_substrate_material = collimating_mirror.create_dataset(
name="substrate_material", data="Si"
)
cm_substrate_material.attrs["NX_class"] = "NX_CHAR"
cm_bending_radius = collimating_mirror.create_soft_link(
name="sagittal radius",
target="/entry/collection/devices/cm_bnd_radius/cm_bnd_radius/value",
)
cm_bending_radius.attrs["NX_class"] = "NX_FLOAT"
cm_bending_radius.attrs["units"] = "km"
cm_incidence_angle = collimating_mirror.create_soft_link(
name="incidence angle", target="/entry/collection/devices/cm_rotx/cm_rotx/value"
)
cm_incidence_angle.attrs["NX_class"] = "NX_FLOAT"
cm_yaw_angle = collimating_mirror.create_soft_link(
name="incident angle", target="/entry/collection/devices/cm_roty/cm_roty/value"
)
cm_yaw_angle.attrs["NX_class"] = "NX_FLOAT"
##################
## fm mirror specific information
###################
focusing_mirror = instrument.create_group(name="focusing_mirror")
focusing_mirror.attrs["NX_class"] = "NXmirror"
fm_substrate_material = focusing_mirror.create_dataset(name="substrate_material", data="Si")
fm_substrate_material.attrs["NX_class"] = "NX_CHAR"
fm_bending_radius = focusing_mirror.create_soft_link(
name="sagittal radius",
target="/entry/collection/devices/fm_bnd_radius/fm_bnd_radius/value",
)
fm_bending_radius.attrs["NX_class"] = "NX_FLOAT"
fm_incidence_angle = focusing_mirror.create_soft_link(
name="incidence angle",
target="/entry/collection/devices/fm_incidence_angle/fm_incidence_angle/value",
)
fm_incidence_angle.attrs["NX_class"] = "NX_FLOAT"
fm_yaw_angle = focusing_mirror.create_soft_link(
name="yaw angle", target="/entry/collection/devices/fm_roty/fm_roty/value"
)
fm_yaw_angle.attrs["NX_class"] = "NX_FLOAT"
fm_roll_angle = focusing_mirror.create_soft_link(
name="roll angle", target="/entry/collection/devices/fm_rotz/fm_rotz/value"
)
fm_roll_angle.attrs["NX_class"] = "NX_FLOAT"
##################
## source specific information
###################
source = instrument.create_group(name="source")
source.attrs["NX_class"] = "NXsource"
beamline_name = source.create_dataset(name="beamline_name", data="Debye")
beamline_name.attrs["NX_class"] = "NX_CHAR"
facility_name = source.create_dataset(name="facility_name", data="Swiss Light Source")
facility_name.attrs["NX_class"] = "NX_CHAR"
probe = source.create_dataset(name="probe", data="X-ray")
probe.attrs["NX_class"] = "NX_CHAR"

View File

@@ -4,3 +4,4 @@ from .mono_bragg_scans import (
XASSimpleScan,
XASSimpleScanWithXRD,
)
from .nidaq_cont_scan import NIDAQContinuousScan

View File

@@ -0,0 +1,12 @@
# from .metadata_schema_xas_simple_scan import xas_simple_scan_schema
METADATA_SCHEMA_REGISTRY = { # "xas_simple_scan": xas_simple_scan_schema
# Add models which should be used to validate scan metadata here.
# Make a model according to the template, and import it as above
# Then associate it with a scan like so:
# "example_scan": ExampleSchema
}
# Define a default schema type which should be used as the fallback for everything:
DEFAULT_SCHEMA = None

View File

@@ -0,0 +1,34 @@
# # By inheriting from BasicScanMetadata you can define a schema by which metadata
# # supplied to a scan must be validated.
# # This schema is a Pydantic model: https://docs.pydantic.dev/latest/concepts/models/
# # but by default it will still allow you to add any arbitrary information to it.
# # That is to say, when you run a scan with which such a model has been associated in the
# # metadata_schema_registry, you can supply any python dictionary with strings as keys
# # and built-in python types (strings, integers, floats) as values, and these will be
# # added to the experiment metadata, but it *must* contain the keys and values of the
# # types defined in the schema class.
# #
# #
# # For example, say that you would like to enforce recording information about sample
# # pretreatment, you could define the following:
# #
#
# from bec_lib.metadata_schema import BasicScanMetadata
#
#
# class ExampleSchema(BasicScanMetadata):
# treatment_description: str
# treatment_temperature_k: int
#
#
# # If this was used according to the example in metadata_schema_registry.py,
# # then when calling the scan, the user would need to write something like:
# >>> scans.example_scan(
# >>> motor,
# >>> 1,
# >>> 2,
# >>> 3,
# >>> metadata={"treatment_description": "oven overnight", "treatment_temperature_k": 575},
# >>> )
#
# # And the additional metadata would be saved in the HDF5 file created for the scan.

View File

@@ -0,0 +1,8 @@
from bec_lib.metadata_schema import BasicScanMetadata
#
#
class xas_simple_scan_schema(BasicScanMetadata):
Edge: str
Element: str

View File

@@ -72,11 +72,7 @@ class XASSimpleScan(AsyncFlyScanBase):
yield None
def pre_scan(self):
"""Pre Scan action. Ensure the motor movetype is set to energy, then check
limits for start/end energy.
#TODO Remove once the motor movetype is removed and ANGLE motion is a pseudo motor.
"""
yield from self.stubs.send_rpc_and_wait(self.motor, "move_type.set", "energy")
"""Pre Scan action."""
self._check_limits()
# Ensure parent class pre_scan actions to be called.

View File

@@ -0,0 +1,84 @@
"""This module contains the scan class for the nidaq of the Debye beamline for use in continuous mode."""
import time
from typing import Literal
import numpy as np
from bec_lib.device import DeviceBase
from bec_lib.logger import bec_logger
from bec_server.scan_server.scans import AsyncFlyScanBase
logger = bec_logger.logger
class NIDAQContinuousScan(AsyncFlyScanBase):
"""Class for the nidaq continuous scan (without mono)"""
scan_name = "nidaq_continuous_scan"
scan_type = "fly"
scan_report_hint = "device_progress"
required_kwargs = []
use_scan_progress_report = False
pre_move = False
gui_config = {"Scan Parameters": ["scan_duration"], "Data Compression": ["compression"]}
def __init__(
self, scan_duration: float, daq: DeviceBase = "nidaq", compression: bool = False, **kwargs
):
"""The NIDAQ continuous scan is used to measure with the NIDAQ without moving the
monochromator or any other motor. The NIDAQ thus runs in continuous mode, with a
set scan_duration.
Args:
scan_duration (float): Duration of the scan.
daq (DeviceBase, optional): DAQ device to be used for the scan.
Defaults to "nidaq".
Examples:
>>> scans.nidaq_continuous_scan(scan_duration=10)
"""
super().__init__(**kwargs)
self.scan_duration = scan_duration
self.daq = daq
self.start_time = 0
self.primary_readout_cycle = 1
self.scan_parameters["scan_duration"] = scan_duration
self.scan_parameters["compression"] = compression
def update_readout_priority(self):
"""Ensure that NIDAQ is not monitored for any quick EXAFS."""
super().update_readout_priority()
self.readout_priority["async"].append("nidaq")
def prepare_positions(self):
"""Prepare the positions for the scan."""
yield None
def pre_scan(self):
"""Pre Scan action."""
self.start_time = time.time()
# Ensure parent class pre_scan actions to be called.
yield from super().pre_scan()
def scan_report_instructions(self):
"""
Return the instructions for the scan report.
"""
yield from self.stubs.scan_report_instruction({"device_progress": [self.daq]})
def scan_core(self):
"""Run the scan core.
Kickoff the acquisition of the NIDAQ wait for the completion of the scan.
"""
kickoff_status = yield from self.stubs.kickoff(device=self.daq)
kickoff_status.wait(timeout=5) # wait for proper kickoff of device
complete_status = yield from self.stubs.complete(device=self.daq, wait=False)
while not complete_status.done:
# Readout monitored devices
yield from self.stubs.read(group="monitored", point_id=self.point_id)
time.sleep(self.primary_readout_cycle)
self.point_id += 1
self.num_pos = self.point_id

View File

@@ -5,7 +5,7 @@ build-backend = "hatchling.build"
[project]
name = "debye_bec"
version = "0.0.0"
description = "Custom device implementations based on the ophyd hardware abstraction layer"
description = "A plugin repository for BEC"
requires-python = ">=3.10"
classifiers = [
"Development Status :: 3 - Alpha",
@@ -16,13 +16,14 @@ dependencies = ["numpy", "scipy", "bec_lib", "h5py", "ophyd_devices"]
[project.optional-dependencies]
dev = [
"bec_server",
"black ~= 25.0",
"black",
"copier",
"isort",
"coverage",
"pylint",
"pytest",
"pytest-random-order",
"bec_server",
]
[project.entry-points."bec"]
@@ -37,12 +38,18 @@ plugin_file_writer = "debye_bec.file_writer"
[project.entry-points."bec.scans"]
plugin_scans = "debye_bec.scans"
[project.entry-points."bec.scans.metadata_schema"]
plugin_metadata_schema = "debye_bec.scans.metadata_schema"
[project.entry-points."bec.ipython_client_startup"]
plugin_ipython_client_pre = "debye_bec.bec_ipython_client.startup.pre_startup"
plugin_ipython_client_post = "debye_bec.bec_ipython_client.startup"
[project.entry-points."bec.widgets"]
plugin_widgets = "debye_bec.bec_widgets"
[project.entry-points."bec.widgets.auto_updates"]
plugin_widgets_update = "debye_bec.bec_widgets.auto_updates"
[project.entry-points."bec.widgets.user_widgets"]
plugin_widgets = "debye_bec.bec_widgets.widgets"
[tool.hatch.build.targets.wheel]
include = ["*"]

View File

@@ -1,31 +1,34 @@
# Getting Started with Testing using pytest
BEC is using the [pytest](https://docs.pytest.org/en/8.0.x/) framework.
It can be install via
``` bash
BEC is using the [pytest](https://docs.pytest.org/en/latest/) framework.
It can be installed via
```bash
pip install pytest
```
in your *python environment*.
in your _python environment_.
We note that pytest is part of the optional-dependencies `[dev]` of the plugin package.
## Introduction
Tests in this package should be stored in the `tests` directory.
We suggest to sort tests of different submodules, i.e. `scans` or `devices` in the respective folder structure, and to folow a naming convention of `<test_module_name.py>`.
It is mandatory for test files to begin with `test_` for pytest to discover them.
To run all tests, navigate to the directory of the plugin from the command line, and run the command
To run all tests, navigate to the directory of the plugin from the command line, and run the command
``` bash
```bash
pytest -v --random-order ./tests
```
Note, the python environment needs to be active.
The additional arg `-v` allows pytest to run in verbose mode which provides more detailed information about the tests being run.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
## Test examples
Writing tests can be quite specific for the given function.
Writing tests can be quite specific for the given function.
We recommend writing tests as isolated as possible, i.e. try to test single functions instead of full classes.
A very useful class to enable isolated testing is [MagicMock](https://docs.python.org/3/library/unittest.mock.html).
In addition, we also recommend to take a look at the [How-to guides from pytest](https://docs.pytest.org/en/8.0.x/how-to/index.html).

View File

@@ -1,31 +1,34 @@
# Getting Started with Testing using pytest
BEC is using the [pytest](https://docs.pytest.org/en/8.0.x/) framework.
It can be install via
``` bash
BEC is using the [pytest](https://docs.pytest.org/en/latest/) framework.
It can be installed via
```bash
pip install pytest
```
in your *python environment*.
in your _python environment_.
We note that pytest is part of the optional-dependencies `[dev]` of the plugin package.
## Introduction
Tests in this package should be stored in the `tests` directory.
We suggest to sort tests of different submodules, i.e. `scans` or `devices` in the respective folder structure, and to folow a naming convention of `<test_module_name.py>`.
It is mandatory for test files to begin with `test_` for pytest to discover them.
To run all tests, navigate to the directory of the plugin from the command line, and run the command
To run all tests, navigate to the directory of the plugin from the command line, and run the command
``` bash
```bash
pytest -v --random-order ./tests
```
Note, the python environment needs to be active.
The additional arg `-v` allows pytest to run in verbose mode which provides more detailed information about the tests being run.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
## Test examples
Writing tests can be quite specific for the given function.
Writing tests can be quite specific for the given function.
We recommend writing tests as isolated as possible, i.e. try to test single functions instead of full classes.
A very useful class to enable isolated testing is [MagicMock](https://docs.python.org/3/library/unittest.mock.html).
In addition, we also recommend to take a look at the [How-to guides from pytest](https://docs.pytest.org/en/8.0.x/how-to/index.html).

View File

@@ -0,0 +1,34 @@
# Getting Started with Testing using pytest
BEC is using the [pytest](https://docs.pytest.org/en/latest/) framework.
It can be installed via
```bash
pip install pytest
```
in your _python environment_.
We note that pytest is part of the optional-dependencies `[dev]` of the plugin package.
## Introduction
Tests in this package should be stored in the `tests` directory.
We suggest to sort tests of different submodules, i.e. `scans` or `devices` in the respective folder structure, and to folow a naming convention of `<test_module_name.py>`.
It is mandatory for test files to begin with `test_` for pytest to discover them.
To run all tests, navigate to the directory of the plugin from the command line, and run the command
```bash
pytest -v --random-order ./tests
```
Note, the python environment needs to be active.
The additional arg `-v` allows pytest to run in verbose mode which provides more detailed information about the tests being run.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
## Test examples
Writing tests can be quite specific for the given function.
We recommend writing tests as isolated as possible, i.e. try to test single functions instead of full classes.
A very useful class to enable isolated testing is [MagicMock](https://docs.python.org/3/library/unittest.mock.html).
In addition, we also recommend to take a look at the [How-to guides from pytest](https://docs.pytest.org/en/8.0.x/how-to/index.html).

View File

@@ -1,31 +1,34 @@
# Getting Started with Testing using pytest
BEC is using the [pytest](https://docs.pytest.org/en/8.0.x/) framework.
It can be install via
``` bash
BEC is using the [pytest](https://docs.pytest.org/en/latest/) framework.
It can be installed via
```bash
pip install pytest
```
in your *python environment*.
in your _python environment_.
We note that pytest is part of the optional-dependencies `[dev]` of the plugin package.
## Introduction
Tests in this package should be stored in the `tests` directory.
We suggest to sort tests of different submodules, i.e. `scans` or `devices` in the respective folder structure, and to folow a naming convention of `<test_module_name.py>`.
It is mandatory for test files to begin with `test_` for pytest to discover them.
To run all tests, navigate to the directory of the plugin from the command line, and run the command
To run all tests, navigate to the directory of the plugin from the command line, and run the command
``` bash
```bash
pytest -v --random-order ./tests
```
Note, the python environment needs to be active.
The additional arg `-v` allows pytest to run in verbose mode which provides more detailed information about the tests being run.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
## Test examples
Writing tests can be quite specific for the given function.
Writing tests can be quite specific for the given function.
We recommend writing tests as isolated as possible, i.e. try to test single functions instead of full classes.
A very useful class to enable isolated testing is [MagicMock](https://docs.python.org/3/library/unittest.mock.html).
In addition, we also recommend to take a look at the [How-to guides from pytest](https://docs.pytest.org/en/8.0.x/how-to/index.html).

View File

@@ -0,0 +1,70 @@
"""Module to test prosilica and Basler cam integrations."""
import threading
from unittest import mock
import ophyd
import pytest
from ophyd_devices.devices.areadetector.cam import AravisDetectorCam, ProsilicaDetectorCam
from ophyd_devices.devices.areadetector.plugins import ImagePlugin_V35
from ophyd_devices.tests.utils import MockPV, patch_dual_pvs
from debye_bec.devices.cameras.basler_cam import BaslerCam
from debye_bec.devices.cameras.prosilica_cam import ProsilicaCam
# pylint: disable=protected-access
# pylint: disable=redefined-outer-name
@pytest.fixture(scope="function")
def mock_basler():
"""Fixture to mock the camera device."""
name = "cam"
prefix = "test:"
with mock.patch.object(ophyd, "cl") as mock_cl:
mock_cl.get_pv = MockPV
mock_cl.thread_class = threading.Thread
dev = BaslerCam(name=name, prefix=prefix)
patch_dual_pvs(dev)
yield dev
def test_basler_init(mock_basler):
"""Test the initialization of the Basler camera device."""
assert mock_basler.name == "cam"
assert mock_basler.prefix == "test:"
assert isinstance(mock_basler.cam1, AravisDetectorCam)
assert isinstance(mock_basler.image1, ImagePlugin_V35)
assert mock_basler._update_frequency == 1
assert mock_basler._live_mode is False
assert mock_basler._live_mode_event is None
assert mock_basler._task_status is None
assert mock_basler.preview.ndim == 2
assert mock_basler.preview.num_rotation_90 == 3
@pytest.fixture(scope="function")
def mock_prosilica():
"""Fixture to mock the camera device."""
name = "cam"
prefix = "test:"
with mock.patch.object(ophyd, "cl") as mock_cl:
mock_cl.get_pv = MockPV
mock_cl.thread_class = threading.Thread
dev = ProsilicaCam(name=name, prefix=prefix)
patch_dual_pvs(dev)
yield dev
def test_prosilica_init(mock_prosilica):
"""Test the initialization of the Prosilica camera device."""
assert mock_prosilica.name == "cam"
assert mock_prosilica.prefix == "test:"
assert isinstance(mock_prosilica.cam1, ProsilicaDetectorCam)
assert isinstance(mock_prosilica.image1, ImagePlugin_V35)
assert mock_prosilica._update_frequency == 1
assert mock_prosilica._live_mode is False
assert mock_prosilica._live_mode_event is None
assert mock_prosilica._task_status is None
assert mock_prosilica.preview.ndim == 2
assert mock_prosilica.preview.num_rotation_90 == 3

View File

@@ -0,0 +1,86 @@
"""Module to test camera base integration class for Debye."""
import threading
from unittest import mock
import ophyd
import pytest
from ophyd_devices.tests.utils import MockPV, patch_dual_pvs
from debye_bec.devices.cameras.debye_base_cam import DebyeBaseCamera
# pylint: disable=protected-access
# pylint: disable=redefined-outer-name
@pytest.fixture(scope="function")
def mock_cam():
"""Fixture to mock the camera device."""
name = "cam"
prefix = "test:"
with mock.patch.object(ophyd, "cl") as mock_cl:
mock_cl.get_pv = MockPV
mock_cl.thread_class = threading.Thread
dev = DebyeBaseCamera(name=name, prefix=prefix)
patch_dual_pvs(dev)
yield dev
def test_init(mock_cam):
"""Test the initialization of the camera device."""
assert mock_cam.name == "cam"
assert mock_cam.prefix == "test:"
assert mock_cam._update_frequency == 1
assert mock_cam._live_mode is False
assert mock_cam._live_mode_event is None
assert mock_cam._task_status is None
assert mock_cam.preview.ndim == 2
assert mock_cam.preview.num_rotation_90 == -1
def test_start_live_mode(mock_cam):
"""Test starting live mode."""
def mock_emit_to_bec(*args, **kwargs):
"""Mock emit_to_bec method."""
while not mock_cam._live_mode_event.wait(1 / mock_cam._update_frequency):
pass
with mock.patch.object(mock_cam, "emit_to_bec", side_effect=mock_emit_to_bec):
mock_cam._start_live_mode()
assert mock_cam._live_mode_event is not None
assert mock_cam._task_status is not None
assert mock_cam._task_status.state == "running"
mock_cam._live_mode_event.set()
# Wait for the task to resolve
mock_cam._task_status.wait(timeout=5)
assert mock_cam._task_status.done is True
def test_stop_live_mode(mock_cam):
"""Test stopping live mode."""
with mock.patch.object(mock_cam, "_live_mode_event") as mock_live_mode_event:
mock_cam._stop_live_mode()
assert mock_live_mode_event.set.called
assert mock_cam._live_mode_event is None
def test_live_mode_property(mock_cam):
"""Test the live_mode property."""
assert mock_cam.live_mode is False
with mock.patch.object(mock_cam, "_start_live_mode") as mock_start_live_mode:
with mock.patch.object(mock_cam, "_stop_live_mode") as mock_stop_live_mode:
# Set to true
mock_cam.live_mode = True
assert mock_start_live_mode.called
assert mock_cam._live_mode is True
assert mock_start_live_mode.call_count == 1
# Second call should call _start_live_mode
mock_cam.live_mode = True
assert mock_start_live_mode.call_count == 1
# Set to false
mock_cam.live_mode = False
assert mock_stop_live_mode.called
assert mock_cam._live_mode is False
assert mock_stop_live_mode.call_count == 1

View File

@@ -52,7 +52,6 @@ def test_init(mock_bragg):
dev = mock_bragg
assert dev.name == "bragg"
assert dev.prefix == "X01DA-OP-MO1:BRAGG:"
assert dev.move_type.get() == MoveType.ENERGY
assert dev.crystal.offset_si111._read_pvname == "X01DA-OP-MO1:BRAGG:offset_si111_RBV"
assert dev.move_abs._read_pvname == "X01DA-OP-MO1:BRAGG:move_abs"
@@ -61,29 +60,15 @@ def test_check_value(mock_bragg):
dev = mock_bragg
dev.low_lim._read_pv.mock_data = 0
dev.high_lim._read_pv.mock_data = 1
dev.low_limit_angle._read_pv.mock_data = 10
dev.high_limit_angle._read_pv.mock_data = 20
# Check that limits are taken correctly from angle or energy
# Energy first
move_type = MoveType.ENERGY
dev.move_type.set(move_type)
# nothing happens
dev.check_value(0.5)
with pytest.raises(LimitError):
dev.check_value(15)
# Angle next
move_type = MoveType.ANGLE
dev.move_type.set(move_type)
dev.check_value(15)
with pytest.raises(LimitError):
dev.check_value(0.5)
def test_egu(mock_bragg):
dev = mock_bragg
assert dev.egu == "eV"
dev.move_type.set(MoveType.ANGLE)
assert dev.egu == "deg"
def test_move_succeeds(mock_bragg):
@@ -135,16 +120,10 @@ def test_set_xtal(mock_bragg):
def test_set_xas_settings(mock_bragg):
dev = mock_bragg
dev.move_type.set(MoveType.ENERGY)
dev.set_xas_settings(low=0.5, high=1, scan_time=0.1)
assert dev.scan_settings.s_scan_energy_lo.get() == 0.5
assert dev.scan_settings.s_scan_energy_hi.get() == 1
assert dev.scan_settings.s_scan_scantime.get() == 0.1
dev.move_type.set(MoveType.ANGLE)
dev.set_xas_settings(low=10, high=20, scan_time=1)
assert dev.scan_settings.s_scan_angle_lo.get() == 10
assert dev.scan_settings.s_scan_angle_hi.get() == 20
assert dev.scan_settings.s_scan_scantime.get() == 1
def test_set_trig_settings(mock_bragg):
@@ -256,42 +235,44 @@ def test_kickoff_scan(mock_bragg):
assert dev.scan_control.scan_start_infinite.get() == 1
def test_complete(mock_bragg):
dev = mock_bragg
dev.scan_control.scan_done._read_pv.mock_data = 0
# Normal case
status = dev.complete()
assert status.done is False
assert status.success is False
dev.scan_control.scan_done._read_pv.mock_data = 1
status.wait()
# time.sleep(0.2)
assert status.done is True
assert status.success is True
# FIXME #22 once mock_pv supports callbacks, high priority!
# def test_complete(mock_bragg):
# dev = mock_bragg
# dev.scan_control.scan_done._read_pv.mock_data = 0
# # Normal case
# status = dev.complete()
# assert status.done is False
# assert status.success is False
# dev.scan_control.scan_done._read_pv.mock_data = 1
# status.wait()
# # time.sleep(0.2)
# assert status.done is True
# assert status.success is True
# Stop called case
dev.scan_control.scan_done._read_pv.mock_data = 0
status = dev.complete()
assert status.done is False
assert status.success is False
dev.stop()
time.sleep(0.2)
assert status.done is True
assert status.success is False
# # Stop called case
# dev.scan_control.scan_done._read_pv.mock_data = 0
# status = dev.complete()
# assert status.done is False
# assert status.success is False
# dev.stop()
# time.sleep(0.2)
# assert status.done is True
# assert status.success is False
def test_unstage(mock_bragg):
mock_bragg.timeout_for_pvwait = 0.5
mock_bragg.scan_control.scan_val_reset._read_pv.mock_data = 0
mock_bragg.scan_control.scan_msg._read_pv.mock_data = ScanControlLoadMessage.PENDING
# FIXME #22 once mock_pv supports callbacks, high priority!
# def test_unstage(mock_bragg):
# mock_bragg.timeout_for_pvwait = 0.5
# mock_bragg.scan_control.scan_val_reset._read_pv.mock_data = 0
# mock_bragg.scan_control.scan_msg._read_pv.mock_data = ScanControlLoadMessage.PENDING
with mock.patch.object(mock_bragg.scan_control.scan_val_reset, "put") as mock_put:
status = mock_bragg.unstage()
assert mock_put.call_count == 0
mock_bragg.scan_control.scan_msg._read_pv.mock_data = ScanControlLoadMessage.SUCCESS
with pytest.raises(TimeoutError):
mock_bragg.unstage()
assert mock_put.call_count == 1
# with mock.patch.object(mock_bragg.scan_control.scan_val_reset, "put") as mock_put:
# status = mock_bragg.unstage()
# assert mock_put.call_count == 0
# mock_bragg.scan_control.scan_msg._read_pv.mock_data = ScanControlLoadMessage.SUCCESS
# with pytest.raises(TimeoutError):
# mock_bragg.unstage()
# assert mock_put.call_count == 1
# TODO reimplement the test for stage method

View File

@@ -0,0 +1,88 @@
"""Tests for the Mo1BraggAngle class."""
import threading
from unittest import mock
import ophyd
import pytest
from ophyd_devices.tests.utils import MockPV, patch_dual_pvs
from debye_bec.devices.mo1_bragg.mo1_bragg_angle import Mo1BraggAngle
from debye_bec.devices.mo1_bragg.mo1_bragg_devices import Mo1BraggStoppedError
# pylint: disable=protected-access
@pytest.fixture(scope="function")
def mock_bragg() -> Mo1BraggAngle:
"""Fixture for the Mo1BraggAngle device."""
name = "bragg"
prefix = "X01DA-OP-MO1:BRAGG:"
with mock.patch.object(ophyd, "cl") as mock_cl:
mock_cl.get_pv = MockPV
mock_cl.thread_class = threading.Thread
dev = Mo1BraggAngle(name=name, prefix=prefix)
patch_dual_pvs(dev)
yield dev
def test_mo1_bragg_angle_init(mock_bragg):
"""Test the initialization of the Mo1BraggAngle device."""
assert mock_bragg.name == "bragg"
assert mock_bragg.prefix == "X01DA-OP-MO1:BRAGG:"
assert isinstance(mock_bragg.readback, ophyd.EpicsSignalRO)
assert isinstance(mock_bragg.setpoint, ophyd.EpicsSignalWithRBV)
assert isinstance(mock_bragg.low_lim, ophyd.EpicsSignalRO)
assert isinstance(mock_bragg.high_lim, ophyd.EpicsSignalRO)
def test_mo1_bragg_angle_egu(mock_bragg):
"""Test the engineering unit of the Mo1BraggAngle device."""
assert mock_bragg.egu == "deg"
def test_mo1_bragg_angle_limits(mock_bragg):
"""Test the limits of the Mo1BraggAngle device."""
mock_bragg.low_lim._read_pv.mock_data = -10
mock_bragg.high_lim._read_pv.mock_data = 10
assert mock_bragg.limits == (-10, 10)
def test_mo1_bragg_angle_move(mock_bragg):
"""Test the move method of the Mo1BraggAngle device."""
mock_bragg.setpoint.put(0)
mock_bragg.readback._read_pv.mock_data = 0 # EpicsSignalRO
# Change PV for motor is moving before starting the move
mock_bragg.motor_is_moving._read_pv.mock_data = 0 # EpicsSignalRO
status = mock_bragg.move(5)
assert status.done is False
# Check setpoint is set correctly
assert mock_bragg.setpoint.get() == 5
# Update the motor is moving PV to simulate that the move is done
mock_bragg.motor_is_moving._read_pv.mock_data = 1
assert mock_bragg.motor_is_moving.get() == 1
status.wait(timeout=5) # If the status does not resolve after 5 seconds, something is wrong
assert status.done is True
def test_mo1_bragg_angle_stop(mock_bragg):
"""Test the stop method of the Mo1BraggAngle device."""
assert mock_bragg.stopped is False
mock_bragg.stop()
assert mock_bragg.stopped is True
status = mock_bragg.move(5)
assert status.done is False
# stopped should be resetted
assert mock_bragg.stopped is False
with pytest.raises(Mo1BraggStoppedError):
mock_bragg.stop()
status.wait(timeout=5) # This should raise before due to stop() call

View File

@@ -0,0 +1,166 @@
# pylint: skip-file
import threading
from typing import Generator
from unittest import mock
import ophyd
import pytest
from bec_server.scan_server.scan_worker import ScanWorker
from ophyd.status import WaitTimeoutError
from ophyd_devices.interfaces.base_classes.psi_device_base import DeviceStoppedError
from ophyd_devices.tests.utils import MockPV
# from bec_server.device_server.tests.utils import DMMock
from debye_bec.devices.nidaq.nidaq import Nidaq, NidaqError
# TODO move this function to ophyd_devices, it is duplicated in csaxs_bec and needed for other pluging repositories
from debye_bec.devices.test_utils.utils import patch_dual_pvs
@pytest.fixture(scope="function")
def scan_worker_mock(scan_server_mock):
"""Scan worker fixture, utility to generate scan_info for a given scan name."""
scan_server_mock.device_manager.connector = mock.MagicMock()
scan_worker = ScanWorker(parent=scan_server_mock)
yield scan_worker
@pytest.fixture(scope="function")
def mock_nidaq() -> Generator[Nidaq, None, None]:
"""Fixture for the Nidaq device."""
name = "nidaq"
prefix = "nidaq:prefix_test:"
with mock.patch.object(ophyd, "cl") as mock_cl:
mock_cl.get_pv = MockPV
mock_cl.thread_class = threading.Thread
dev = Nidaq(name=name, prefix=prefix)
patch_dual_pvs(dev)
yield dev
def test_init(mock_nidaq):
"""Test the initialization of the Nidaq device."""
dev = mock_nidaq
assert dev.name == "nidaq"
assert dev.prefix == "nidaq:prefix_test:"
assert dev.valid_scan_names == [
"xas_simple_scan",
"xas_simple_scan_with_xrd",
"xas_advanced_scan",
"xas_advanced_scan_with_xrd",
"nidaq_continuous_scan",
]
def test_check_if_scan_name_is_valid(mock_nidaq):
"""Test the check_if_scan_name_is_valid method."""
dev = mock_nidaq
dev.scan_info.msg.scan_name = "xas_simple_scan"
assert dev._check_if_scan_name_is_valid()
dev.scan_info.msg.scan_name = "invalid_scan_name"
assert not dev._check_if_scan_name_is_valid()
def test_set_config(mock_nidaq):
dev = mock_nidaq
# TODO #21 Add test logic for set_config, issue created #
def test_on_connected(mock_nidaq):
"""Test the on_connected method of the Nidaq device."""
dev = mock_nidaq
dev.power.put(0)
dev.heartbeat._read_pv.mock_data = 0
# First scenario, raise timeout error
# This will raise a WaitTimeoutError error as we currently do not support callbacks in the MockPV
dev.timeout_wait_for_signal = 0.1
# To check that it raised, we check that dev.power PV is set to 1
# Set state PV to 0, 1 is expected value
dev.state._read_pv.mock_data = 0
with pytest.raises(WaitTimeoutError):
dev.on_connected()
assert dev.power.get() == 1
# TODO, once the MOCKPv supports callbacks, we can test the rest of the logic issue #22
# def test_on_stage(mock_nidaq):
# dev = mock_nidaq
# #TODO Add once MockPV supports callbacks #22
def test_on_kickoff(mock_nidaq):
"""Test the on_kickoff method of the Nidaq device."""
dev = mock_nidaq
dev.kickoff_call.put(0)
dev.kickoff()
assert dev.kickoff_call.get() == 1
def test_on_unstage(mock_nidaq):
"""Test the on_unstage method of the Nidaq device."""
dev = mock_nidaq
dev.state._read_pv.mock_data = 0 # Set state to 0, 1 is Standby
dev._timeout_wait_for_pv = 0.1 # Set a short timeout for testing
dev.enable_compression._read_pv.mock_data = 0 # Compression enabled
with pytest.raises(WaitTimeoutError):
dev.on_unstage()
dev.state._read_pv.mock_data = 1
# FIXME #22 add callback mechanism to MockPV to test the rest of the logic
# dev.on_unstage()
# assert dev.enable_compression.get() == 1
@pytest.mark.parametrize(
["scan_name", "raise_error", "nidaq_state"],
[
("line_scan", False, 0),
("xas_simple_scan", False, 3),
("xas_simple_scan", True, 0),
("nidaq_continuous_scan", False, 0),
],
)
def test_on_pre_scan(mock_nidaq, scan_name, raise_error, nidaq_state):
"""Test the on_pre_scan method of the Nidaq device."""
dev = mock_nidaq
dev.state.put(nidaq_state)
dev.scan_info.msg.scan_name = scan_name
dev._timeout_wait_for_pv = 0.1 # Set a short timeout for testing
if not raise_error:
dev.pre_scan()
else:
with pytest.raises(WaitTimeoutError):
dev.pre_scan()
def test_on_complete(mock_nidaq):
"""Test the on_complete method of the Nidaq device."""
dev = mock_nidaq
# Check for nidaq_continuous_scan
dev.scan_info.msg.scan_name = "nidaq_continuous_scan"
dev.state.put(0) # Set state to DISABLED
status = dev.complete()
assert status.done is False
dev.state.put(1)
# Should resolve now
status.wait(timeout=5) # Wait for the status to complete
assert status.done is True
# Check for XAS simple scan
dev.scan_info.msg.scan_name = "xas_simple_scan"
dev.state.put(0) # Set state to ACQUIRE
dev.stop_call.put(0)
dev._timeout_wait_for_pv = 5
status = dev.on_complete()
assert status.done is False
assert dev.stop_call.get() == 1 # Should have called stop
dev.state.put(1) # Set state to STANDBY
# Should resolve now
status.wait(timeout=5) # Wait for the status to complete
assert status.done is True
# Test that it resolves if device is stopped
dev.state.put(0) # Set state to DISABLED
dev.stop()
status.wait(timeout=5)
assert status.done is True

View File

@@ -1,31 +1,34 @@
# Getting Started with Testing using pytest
BEC is using the [pytest](https://docs.pytest.org/en/8.0.x/) framework.
It can be install via
``` bash
BEC is using the [pytest](https://docs.pytest.org/en/latest/) framework.
It can be installed via
```bash
pip install pytest
```
in your *python environment*.
in your _python environment_.
We note that pytest is part of the optional-dependencies `[dev]` of the plugin package.
## Introduction
Tests in this package should be stored in the `tests` directory.
We suggest to sort tests of different submodules, i.e. `scans` or `devices` in the respective folder structure, and to folow a naming convention of `<test_module_name.py>`.
It is mandatory for test files to begin with `test_` for pytest to discover them.
To run all tests, navigate to the directory of the plugin from the command line, and run the command
To run all tests, navigate to the directory of the plugin from the command line, and run the command
``` bash
```bash
pytest -v --random-order ./tests
```
Note, the python environment needs to be active.
The additional arg `-v` allows pytest to run in verbose mode which provides more detailed information about the tests being run.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
## Test examples
Writing tests can be quite specific for the given function.
Writing tests can be quite specific for the given function.
We recommend writing tests as isolated as possible, i.e. try to test single functions instead of full classes.
A very useful class to enable isolated testing is [MagicMock](https://docs.python.org/3/library/unittest.mock.html).
In addition, we also recommend to take a look at the [How-to guides from pytest](https://docs.pytest.org/en/8.0.x/how-to/index.html).

View File

@@ -1,31 +1,34 @@
# Getting Started with Testing using pytest
BEC is using the [pytest](https://docs.pytest.org/en/8.0.x/) framework.
It can be install via
``` bash
BEC is using the [pytest](https://docs.pytest.org/en/latest/) framework.
It can be installed via
```bash
pip install pytest
```
in your *python environment*.
in your _python environment_.
We note that pytest is part of the optional-dependencies `[dev]` of the plugin package.
## Introduction
Tests in this package should be stored in the `tests` directory.
We suggest to sort tests of different submodules, i.e. `scans` or `devices` in the respective folder structure, and to folow a naming convention of `<test_module_name.py>`.
It is mandatory for test files to begin with `test_` for pytest to discover them.
To run all tests, navigate to the directory of the plugin from the command line, and run the command
To run all tests, navigate to the directory of the plugin from the command line, and run the command
``` bash
```bash
pytest -v --random-order ./tests
```
Note, the python environment needs to be active.
The additional arg `-v` allows pytest to run in verbose mode which provides more detailed information about the tests being run.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
## Test examples
Writing tests can be quite specific for the given function.
Writing tests can be quite specific for the given function.
We recommend writing tests as isolated as possible, i.e. try to test single functions instead of full classes.
A very useful class to enable isolated testing is [MagicMock](https://docs.python.org/3/library/unittest.mock.html).
In addition, we also recommend to take a look at the [How-to guides from pytest](https://docs.pytest.org/en/8.0.x/how-to/index.html).

View File

@@ -92,18 +92,6 @@ def test_xas_simple_scan(scan_assembler, ScanStubStatusMock):
action="read",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device="mo1_bragg",
action="rpc",
parameter={
"device": "mo1_bragg",
"func": "move_type.set",
"rpc_id": "my_test_rpc_id",
"args": ("energy",),
"kwargs": {},
},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"],
@@ -205,18 +193,6 @@ def test_xas_simple_scan_with_xrd(scan_assembler, ScanStubStatusMock):
action="read",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device="mo1_bragg",
action="rpc",
parameter={
"device": "mo1_bragg",
"func": "move_type.set",
"rpc_id": "my_test_rpc_id",
"args": ("energy",),
"kwargs": {},
},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"],
@@ -312,18 +288,6 @@ def test_xas_advanced_scan(scan_assembler, ScanStubStatusMock):
action="read",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device="mo1_bragg",
action="rpc",
parameter={
"device": "mo1_bragg",
"func": "move_type.set",
"rpc_id": "my_test_rpc_id",
"args": ("energy",),
"kwargs": {},
},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"],
@@ -427,18 +391,6 @@ def test_xas_advanced_scan_with_xrd(scan_assembler, ScanStubStatusMock):
action="read",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device="mo1_bragg",
action="rpc",
parameter={
"device": "mo1_bragg",
"func": "move_type.set",
"rpc_id": "my_test_rpc_id",
"args": ("energy",),
"kwargs": {},
},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"],

View File

@@ -0,0 +1,127 @@
# pylint: skip-file
from unittest import mock
from bec_lib.messages import DeviceInstructionMessage
from bec_server.device_server.tests.utils import DMMock
from debye_bec.scans import NIDAQContinuousScan
def get_instructions(request, ScanStubStatusMock):
request.metadata["RID"] = "my_test_request_id"
def fake_done():
"""
Fake done function for ScanStubStatusMock. Upon each call, it returns the next value from the generator.
This is used to simulate the completion of the scan.
"""
yield False
yield False
yield True
def fake_complete(*args, **kwargs):
yield "fake_complete"
return ScanStubStatusMock(done_func=fake_done)
with (
mock.patch.object(request.stubs, "complete", side_effect=fake_complete),
mock.patch.object(request.stubs, "_get_result_from_status", return_value=None),
):
reference_commands = list(request.run())
for cmd in reference_commands:
if not cmd or isinstance(cmd, str):
continue
if "RID" in cmd.metadata:
cmd.metadata["RID"] = "my_test_request_id"
if "rpc_id" in cmd.parameter:
cmd.parameter["rpc_id"] = "my_test_rpc_id"
cmd.metadata.pop("device_instr_id", None)
return reference_commands
def test_xas_simple_scan(scan_assembler, ScanStubStatusMock):
request = scan_assembler(NIDAQContinuousScan, scan_duration=10)
request.device_manager.add_device("nidaq")
reference_commands = get_instructions(request, ScanStubStatusMock)
assert reference_commands == [
None,
None,
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=None,
action="scan_report_instruction",
parameter={"device_progress": ["nidaq"]},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=None,
action="open_scan",
parameter={
"scan_motors": [],
"readout_priority": {
"monitored": [],
"baseline": [],
"on_request": [],
"async": ["nidaq"],
},
"num_points": 0,
"positions": [],
"scan_name": "nidaq_continuous_scan",
"scan_type": "fly",
},
),
DeviceInstructionMessage(metadata={}, device="nidaq", action="stage", parameter={}),
DeviceInstructionMessage(
metadata={},
device=["bpm4i", "eiger", "mo1_bragg", "samx"],
action="stage",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "baseline", "RID": "my_test_request_id"},
device=["samx"],
action="read",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"],
action="pre_scan",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device="nidaq",
action="kickoff",
parameter={"configure": {}},
),
"fake_complete",
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id", "point_id": 0},
device=["bpm4i", "eiger", "mo1_bragg"],
action="read",
parameter={"group": "monitored"},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id", "point_id": 1},
device=["bpm4i", "eiger", "mo1_bragg"],
action="read",
parameter={"group": "monitored"},
),
"fake_complete",
DeviceInstructionMessage(
metadata={},
device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"],
action="unstage",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=None,
action="close_scan",
parameter={},
),
]