12 Commits

Author SHA1 Message Date
x10da d17f3deefa Introduced nexus data structure (same as Debye)
CI for superxas_bec / test (pull_request) Successful in 34s
CI for superxas_bec / test (push) Successful in 38s
2026-05-07 14:10:40 +02:00
x10da f70ac8743d Added frontend absorber 2026-05-07 14:10:04 +02:00
x10da 2981c436db Updated config files 2026-05-07 14:10:04 +02:00
x10da 09a0bc6372 Adding signals of additional nidaq signals 2026-05-07 14:10:04 +02:00
x10da fff1e21481 Renaming of offset signals 2026-05-07 14:10:04 +02:00
x10da 2bfa7b6ca3 Change of order of nidaq signals 2026-05-07 14:10:04 +02:00
x10da 3f79b300ed Implemented logic to move goniometer to correct position. Disabled XAS with XRD scans 2026-05-07 14:10:04 +02:00
x10da 36cffb72a5 Disabled XAS with XRD scans 2026-05-07 14:10:04 +02:00
x10da 55bc4585e2 Added new signals 2026-05-07 14:10:04 +02:00
x10da 4f1386f5e1 Bugfix Transistionstatus in kickoff 2026-05-07 14:10:04 +02:00
x10da d687d74a74 Added mono goniometer motor 2026-05-07 14:10:04 +02:00
hitz_s aa6270fb55 introduction of mo1_bragg and nidaq 2026-05-07 14:10:04 +02:00
29 changed files with 3949 additions and 5 deletions
+4
View File
@@ -12,7 +12,11 @@ classifiers = [
"Programming Language :: Python :: 3",
"Topic :: Scientific/Engineering",
]
dependencies = [
"numpy",
"scipy",
"xrt",
"websockets",
]
@@ -0,0 +1,239 @@
"""
X10DA / SuperXAS Beamline Parameters.
This file describes the parameter of each component of the SuperXAS beamline
to be used for raytracing and geometrical calculations.
"""
import os
import numpy as np
from collections import namedtuple
import xrt.backends.raycing.materials as rm
# if os.environ.get("USE_XRT", "True").lower() in ("1", "true", "yes"):
# import xrt.backends.raycing.materials as rm # type: ignore
# else:
# class _DummyClass:
# def __init__(self, *args, **kwargs):
# pass
# class _DummyMaterials:
# Material = _DummyClass
# CrystalSi = _DummyClass
# rm = _DummyMaterials()
# XRT definitions
filterBeryl = rm.Material('Be', rho=1.85, kind='plate')
filterDiamond = rm.Material('C', rho=3.52, kind='plate')
filterGraphite = rm.Material('C', rho=2.266, kind='plate')
stripeSi = rm.Material('Si', rho=2.33)
stripePt = rm.Material('Pt', rho=21.45)
stripeRh = rm.Material('Rh', rho=12.41)
stripeCr = rm.Material('Cr', rho=7.14)
stripePyrex = rm.Material('Si', rho=2.20) # Use Si as bare element and the density of SiO2
si111_1 = rm.CrystalSi(hkl=(1, 1, 1), tK=77) # first xtal surface
si311_1 = rm.CrystalSi(hkl=(3, 1, 1), tK=77) # first xtal surface
si333_1 = rm.CrystalSi(hkl=(3, 3, 3), tK=77) # first xtal surface
si511_1 = rm.CrystalSi(hkl=(5, 1, 1), tK=77) # first xtal surface
si111_2 = rm.CrystalSi(hkl=(1, 1, 1), tK=77) # second xtal surface
si311_2 = rm.CrystalSi(hkl=(3, 1, 1), tK=77) # second xtal surface
si333_2 = rm.CrystalSi(hkl=(3, 3, 3), tK=77) # second xtal surface
si511_2 = rm.CrystalSi(hkl=(5, 1, 1), tK=77) # second xtal surface
filterDiamond = rm.Material('C', rho=3.52, kind='plate')
filterBe = rm.Material('Be', rho=1.85, kind='plate')
filterSi3N4 = rm.Material(['Si', 'N'], quantities=[3, 4], rho=3.44, kind='plate')
filterAl = rm.Material('Al', rho=2.69, kind='plate')
filterGraphite = rm.Material('C', rho=2.266, kind='plate')
sourceHeight = 0
#Synchrotron
synchrotron = namedtuple('synchrotron', ['eE', 'eI', 'eEspread',
'eEpsilonX', 'eEpsilonZ', 'betaX', 'betaZ'])
sls1 = synchrotron(
eE = 2.4,
eI = 0.4,
eEspread=0.878e-3,
eEpsilonX=5.63,
eEpsilonZ=0.007,
betaX=0.45,
betaZ=14.4,
)
sls2 = synchrotron(
eE=2.7,
eI=0.4,
eEspread=1.147e-3,
eEpsilonX=0.156,
eEpsilonZ=0.01,
betaX=0.18,
betaZ=4.6,
)
# Source
bendingMagnet = namedtuple('bendingMagnet', ['name', 'center', 'sync', 'B0'])
sls1_29t = bendingMagnet(
name='FE-BM-SLS1-2.9T',
center=(0, 0, 0),
sync=sls1,
B0=2.9,)
sls2_21t = bendingMagnet(
name='FE-BM-SLS2-2.1T',
center=(0, 0, 0),
sync=sls1,
B0=2.1,)
# FE slits
slits = namedtuple('slits', ['name', 'center', 'maxDivH', 'maxDivV'])
feSlits = slits(
name='FE-SLITS',
center=(0, 5290, sourceHeight),
maxDivH=1.8e-3,
maxDivV=0.8e-3,)
# Filters
filt = namedtuple('filt', ['name', 'center', 'pitch', 'limPhysX', 'limPhysY', 'surface', 'material', 'thickness'])
feWindow = filt(
name='FE-WINDOW',
center=(0., 6158, sourceHeight),
pitch=np.pi/2,
limPhysX=(-6, 6),
limPhysY=(-3., 3.),
surface='None',
material=filterDiamond,
thickness=0.1,)
feWindow = feWindow._replace(surface='CVD Diamond window {0:0.0f} $\mu$m'.format(feWindow.thickness*1e3))
feFilt = filt(
name='FE-FI',
center=(0., 6590, sourceHeight),
pitch=np.pi/2,
limPhysX=(-15, 15),
limPhysY=(-10, 10),
surface='None',
material=filterGraphite,
thickness=0.25,)
feFilt = feFilt._replace(surface='Graphite filter {0:0.0f} $\mu$m'.format(feFilt.thickness*1e3))
# Collimating mirror
collimatingMirror = namedtuple('collimatingMirror', ['name',
'center', 'surface', 'material', 'limPhysX', 'limPhysY',
'limOptX', 'limOptY', 'R', 'pitch', 'jack1', 'jack2', 'jack3',
'tx1', 'tx2'])
cm = collimatingMirror(
name='FE-CM',
center=[0, 7618, sourceHeight],
surface=('Rh','Si','Pt'),
material=(stripeRh, stripeSi, stripePt),
limPhysX=(-30, 30),
limPhysY=(-600, 600),
limOptX=((11, -2, -21), (21, 8, -5)),
limOptY=((-500, -500, -500), (500, 500, 500)),
R=[3e6, 15e6],
pitch=[1.4e-3, 4.5e-3],
jack1=[0., 7210., 0.], #Tripod X, Y, Z (global)
jack2=[-210., 8310., 0.],
jack3=[210., 8310., 0.],
tx1=[0.0, -575.5], # X-Stage 1 [x, y] (local)
tx2=[0.0, 575],) # X-Stage 2
apertures = namedtuple('apertures', ['name', 'center', 'opening'])
fePS = apertures(
name='FE-PS',
center=[0, 8760, sourceHeight],
opening=[-39/2, 39/2, -10, 29]) # left, right, bottom, top
opWbBsBlock = apertures(
name='OP-WB-BS-BLOCK',
center=[0., 13606-135, sourceHeight],
opening=[-18., 18., 42, 76]) # left, right, bottom, top
opSlits = apertures(
name='OP-SLITS',
center=[0, 14145-135, sourceHeight],
opening=[-35/2, 35/2, 47.5, 82.5])
# Monochromator
monochromator = namedtuple('monochromator', ['name', 'center',
'xtal', 'material1', 'material2', 'xtalWidth', 'xtalOffsetX',
'xtalLength1', 'xtalLength2', 'xtalGap', 'rotOffset',
'heightOffset', 'braggLim', 'jack1', 'jack2', 'jack3', 'tx'])
mo1 = monochromator(
name='OP-CCM1',
center=[0., 11670-135, sourceHeight],
xtal=('Si311','Si111'),
material1=(si311_1, si111_1),
material2=(si311_2, si111_2),
xtalWidth = (20, 20),
xtalOffsetX=(-19.2, 19.2),
xtalLength1 = (60, 60),
xtalLength2 = (60, 60),
xtalGap = (8, 8),
rotOffset = 6, # not sure what it is
heightOffset = 8.5, # not sure what it is
braggLim = [4, 35],
jack1=[0., 11350., 0.], #Tripod not available!
jack2=[-400., 12350., 0.],
jack3=[400., 12350., 0.],
tx=0.0,) # X-Stage [x]
# Focusing mirror
focusingMirror = namedtuple('focusingMirror', ['name', 'center',
'surfaceToroid', 'materialToroid',
'limPhysXToroid', 'limPhysYToroid',
'limOptXToroid', 'limOptYToroid',
'R', 'pitch', 'r', 'xToroid', 'hToroid', 'jack1', 'jack2', 'jack3',
'tx1', 'tx2'])
fm = focusingMirror(
name='OP-FM',
center=[0., 15580-135, sourceHeight],
surfaceToroid=('Rh', 'Pt'),
materialToroid=(stripeRh, stripePt),
limPhysXToroid=(-54., 54.),
limPhysYToroid=(-565., 565.),
limOptXToroid=((90.25, 41.75), (51.75, 5.75)), # With old VME axis, no absolute value!
limOptYToroid=((-500., -500.), (500., 500.)),
R=[3e6, 15e6],
pitch=[1.4e-3, 4.5e-3],
r=[30, 20],
xToroid=[24.126, -22,874], # offset in local x
hToroid=[7., 11.3], # depth of the cylinder at x = xCylinder1 and x = xCylinder2.
jack1=[0., 14980., 0.],
jack2=[-75., 16180., 0.],
jack3=[75., 16180., 0.],
tx1=[0., -575.], # X-Stage 1 [x, y]
tx2=[0., 575.],) # X-Stage 2 [x, y]
ehWindow = filt(
name='EH-WINDOW',
center=(0., 22225-135, sourceHeight),
pitch=np.pi/2,
limPhysX=(-10., 10.),
limPhysY=(17.5, 92.5),
surface='None',
material=filterBe,
thickness=0.25,)
ehWindow = ehWindow._replace(surface='Beryllium window {0:0.0f} $\mu$m'.format(ehWindow.thickness*1e3))
# Sample
sample = namedtuple('sample', ['name', 'center'])
smpl = sample(
name='OP-SMPL',
center=[0, 24000-135, sourceHeight],)
@@ -1,4 +1,3 @@
#######################################
## Beam Monitors 2 and 3 -- Virtual positioners
@@ -0,0 +1,243 @@
###################################
## Frontend Absorber ##
###################################
abs:
readoutPriority: baseline
description: Frontend Absorber
deviceClass: superxas_bec.devices.absorber.Absorber
deviceConfig:
prefix: "X10DA-FE-ABS1:"
onFailure: retry
enabled: true
softwareTrigger: false
###################################
## Frontend Slits ##
###################################
sldi_trxr:
readoutPriority: baseline
description: Front-end slit diaphragm X-translation Ring-edge
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-FE-SLDI:TRXR
onFailure: retry
enabled: true
softwareTrigger: false
sldi_trxw:
readoutPriority: baseline
description: Front-end slit diaphragm X-translation Wall-edge
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-FE-SLDI:TRXW
onFailure: retry
enabled: true
softwareTrigger: false
sldi_tryb:
readoutPriority: baseline
description: Front-end slit diaphragm Y-translation Bottom-edge
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-FE-SLDI:TRYB
onFailure: retry
enabled: true
softwareTrigger: false
sldi_tryt:
readoutPriority: baseline
description: Front-end slit diaphragm X-translation Top-edge
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-FE-SLDI:TRYT
onFailure: retry
enabled: true
softwareTrigger: false
sldi_centerx:
readoutPriority: baseline
description: Front-end slit diaphragm X-center
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-FE-SLDI:CENTERX
onFailure: retry
enabled: true
softwareTrigger: false
sldi_gapx:
readoutPriority: baseline
description: Front-end slit diaphragm X-gap
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-FE-SLDI:GAPX
onFailure: retry
enabled: true
softwareTrigger: false
sldi_centery:
readoutPriority: baseline
description: Front-end slit diaphragm Y-center
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-FE-SLDI:CENTERY
onFailure: retry
enabled: true
softwareTrigger: false
sldi_gapy:
readoutPriority: baseline
description: Front-end slit diaphragm Y-gap
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-FE-SLDI:GAPY
onFailure: retry
enabled: true
softwareTrigger: false
###################################
## Collimating Mirror ##
###################################
cm_trxu:
readoutPriority: baseline
description: Collimating Mirror X-translation upstream
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-FE-CM:TRXU
onFailure: retry
enabled: true
softwareTrigger: false
cm_trxd:
readoutPriority: baseline
description: Collimating Mirror X-translation downstream
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-FE-CM:TRXD
onFailure: retry
enabled: true
softwareTrigger: false
cm_tryu:
readoutPriority: baseline
description: Collimating Mirror Y-translation upstream
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-FE-CM:TRYU
onFailure: retry
enabled: true
softwareTrigger: false
cm_trydr:
readoutPriority: baseline
description: Collimating Mirror Y-translation downstream ring
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-FE-CM:TRYDR
onFailure: retry
enabled: true
softwareTrigger: false
cm_trydw:
readoutPriority: baseline
description: Collimating Mirror Y-translation downstream wall
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-FE-CM:TRYDW
onFailure: retry
enabled: true
softwareTrigger: false
cm_bnd:
readoutPriority: baseline
description: Collimating Mirror bender
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-FE-CM:BND
onFailure: retry
enabled: true
softwareTrigger: false
# cm_bnd_radius:
# readoutPriority: baseline
# description: Collimating Mirror Bending Radius
# deviceClass: ophyd.EpicsSignalRO
# deviceConfig:
# read_pv: X10DA-CPCL-CM:BNDFORCE
# onFailure: retry
# readOnly: true
# enabled: true
# softwareTrigger: false
cm_rotx:
readoutPriority: baseline
description: Collimating Morror Pitch
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-FE-CM:ROTX
onFailure: retry
enabled: true
softwareTrigger: false
cm_roty:
readoutPriority: baseline
description: Collimating Morror Yaw
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-FE-CM:ROTY
onFailure: retry
enabled: true
softwareTrigger: false
cm_rotz:
readoutPriority: baseline
description: Collimating Morror Roll
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-FE-CM:ROTZ
onFailure: retry
enabled: true
softwareTrigger: false
cm_trx:
readoutPriority: baseline
description: Collimating Morror Center Point X
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-FE-CM:XTCP
onFailure: retry
enabled: true
softwareTrigger: false
cm_try:
readoutPriority: baseline
description: Collimating Morror Center Point Y
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-FE-CM:YTCP
onFailure: retry
enabled: true
softwareTrigger: false
cm_ztcp:
readoutPriority: baseline
description: Collimating Morror Center Point Z
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-FE-CM:ZTCP
onFailure: retry
enabled: true
softwareTrigger: false
cm_xstripe:
readoutPriority: baseline
description: Collimating Morror X Stripe
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-FE-CM:XSTRIPE
onFailure: retry
enabled: true
softwareTrigger: false
@@ -0,0 +1,18 @@
###################################
## SLS Machine ##
###################################
curr:
readoutPriority: baseline
description: SLS ring current
deviceClass: ophyd.EpicsSignalRO
deviceConfig:
auto_monitor: true
read_pv: AGEBD-DBPM3CURR:CURRENT-AVG
deviceTags:
- machine
onFailure: buffer
enabled: true
readOnly: true
softwareTrigger: false
@@ -0,0 +1,396 @@
###################################
## Monochromator ##
###################################
mo1_try:
readoutPriority: baseline
description: Monochromator Y Translation
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-OP1-MO1:TRY
onFailure: retry
enabled: true
softwareTrigger: false
mo1_trx:
readoutPriority: baseline
description: Monochromator X Translation
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-OP1-MO1:TRX
onFailure: retry
enabled: true
softwareTrigger: false
###################################
## Optics Slits + Beam Monitor 1 ##
###################################
# sl1_trxr:
# readoutPriority: baseline
# description: Optics slits 1 X-translation Ring-edge
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X10DA-OP-SL1:TRXR
# onFailure: retry
# enabled: true
# softwareTrigger: false
# deviceTags:
# - optics
# - slits
# sl1_trxw:
# readoutPriority: baseline
# description: Optics slits 1 X-translation Wall-edge
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X10DA-OP-SL1:TRXW
# onFailure: retry
# enabled: true
# softwareTrigger: false
# deviceTags:
# - optics
# - slits
# sl1_tryb:
# readoutPriority: baseline
# description: Optics slits 1 Y-translation Bottom-edge
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X10DA-OP-SL1:TRYB
# onFailure: retry
# enabled: true
# softwareTrigger: false
# deviceTags:
# - optics
# - slits
# sl1_tryt:
# readoutPriority: baseline
# description: Optics slits 1 X-translation Top-edge
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X10DA-OP-SL1:TRYT
# onFailure: retry
# enabled: true
# softwareTrigger: false
# deviceTags:
# - optics
# - slits
# bm1_try:
# readoutPriority: baseline
# description: Beam Monitor 1 Y-translation
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X10DA-OP-BM1:TRY
# onFailure: retry
# enabled: true
# softwareTrigger: false
# deviceTags:
# - optics
# - slits
# sl1_centerx:
# readoutPriority: baseline
# description: Optics slits 1 X-center
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X10DA-OP-SL1:CENTERX
# onFailure: retry
# enabled: true
# softwareTrigger: false
# deviceTags:
# - optics
# - slits
# sl1_gapx:
# readoutPriority: baseline
# description: Optics slits 1 X-gap
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X10DA-OP-SL1:GAPX
# onFailure: retry
# enabled: true
# softwareTrigger: false
# deviceTags:
# - optics
# - slits
# sl1_centery:
# readoutPriority: baseline
# description: Optics slits 1 Y-center
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X10DA-OP-SL1:CENTERY
# onFailure: retry
# enabled: true
# softwareTrigger: false
# deviceTags:
# - optics
# - slits
# sl1_gapy:
# readoutPriority: baseline
# description: Optics slits 1 Y-gap
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X10DA-OP-SL1:GAPY
# onFailure: retry
# enabled: true
# softwareTrigger: false
# deviceTags:
# - optics
# - slits
###################################
## Focusing Mirror ##
###################################
# fm_trxu:
# readoutPriority: baseline
# description: Focusing Mirror X-translation upstream
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X10DA-OP-FM:TRXU
# onFailure: retry
# enabled: true
# softwareTrigger: false
# fm_trxd:
# readoutPriority: baseline
# description: Focusing Mirror X-translation downstream
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X10DA-OP-FM:TRXD
# onFailure: retry
# enabled: true
# softwareTrigger: false
# fm_tryd:
# readoutPriority: baseline
# description: Focusing Mirror Y-translation downstream
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X10DA-OP-FM:TRYD
# onFailure: retry
# enabled: true
# softwareTrigger: false
# fm_tryur:
# readoutPriority: baseline
# description: Focusing Mirror Y-translation upstream ring
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X10DA-OP-FM:TRYUR
# onFailure: retry
# enabled: true
# softwareTrigger: false
# fm_tryuw:
# readoutPriority: baseline
# description: Focusing Mirror Y-translation upstream wall
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X10DA-OP-FM:TRYUW
# onFailure: retry
# enabled: true
# softwareTrigger: false
fm_bnd:
readoutPriority: baseline
description: Focusing Mirror bender
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-OP-MI1:TRB
onFailure: retry
enabled: true
softwareTrigger: false
# fm_bnd_radius:
# readoutPriority: baseline
# description: Focusing Mirror Bending Radius
# deviceClass: ophyd.EpicsSignalRO
# deviceConfig:
# read_pv: X10DA-CPCL-FM:BNDFORCE
# onFailure: retry
# readOnly: true
# enabled: true
# softwareTrigger: false
fm_rotx:
readoutPriority: baseline
description: Focusing Morror Pitch
deviceClass: ophyd.EpicsSignalRO
deviceConfig:
auto_monitor: true
read_pv: X10DA-OP-MI1:pitch
onFailure: retry
enabled: true
softwareTrigger: false
fm_roty:
readoutPriority: baseline
description: Focusing Morror Yaw
deviceClass: ophyd.EpicsSignalRO
deviceConfig:
auto_monitor: true
read_pv: X10DA-OP-MI1:yaw
onFailure: retry
enabled: true
softwareTrigger: false
fm_rotz:
readoutPriority: baseline
description: Focusing Morror Roll
deviceClass: ophyd.EpicsSignalRO
deviceConfig:
auto_monitor: true
read_pv: X10DA-OP-MI1:roll
onFailure: retry
enabled: true
softwareTrigger: false
fm_trx:
readoutPriority: baseline
description: Focusing Morror Center Point X
deviceClass: ophyd.EpicsSignalRO
deviceConfig:
auto_monitor: true
read_pv: X10DA-OP-MI1:trans
onFailure: retry
enabled: true
softwareTrigger: false
fm_try:
readoutPriority: baseline
description: Focusing Morror Center Point Y
deviceClass: ophyd.EpicsSignalRO
deviceConfig:
auto_monitor: true
read_pv: X10DA-OP-MI1:y
onFailure: retry
enabled: true
softwareTrigger: false
###################################
## Optics Slits + Beam Monitor 2 ##
###################################
# sl2_trxr:
# readoutPriority: baseline
# description: Optics slits 2 X-translation Ring-edge
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X10DA-OP-SL2:TRXR
# onFailure: retry
# enabled: true
# softwareTrigger: false
# deviceTags:
# - optics
# - slits
# sl2_trxw:
# readoutPriority: baseline
# description: Optics slits 2 X-translation Wall-edge
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X10DA-OP-SL2:TRXW
# onFailure: retry
# enabled: true
# softwareTrigger: false
# deviceTags:
# - optics
# - slits
# sl2_tryb:
# readoutPriority: baseline
# description: Optics slits 2 Y-translation Bottom-edge
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X10DA-OP-SL2:TRYB
# onFailure: retry
# enabled: true
# softwareTrigger: false
# deviceTags:
# - optics
# - slits
# sl2_tryt:
# readoutPriority: baseline
# description: Optics slits 2 X-translation Top-edge
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X10DA-OP-SL2:TRYT
# onFailure: retry
# enabled: true
# softwareTrigger: false
# deviceTags:
# - optics
# - slits
# bm2_try:
# readoutPriority: baseline
# description: Beam Monitor 2 Y-translation
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X10DA-OP-BM2:TRY
# onFailure: retry
# enabled: true
# softwareTrigger: false
# deviceTags:
# - optics
# - slits
# sl2_centerx:
# readoutPriority: baseline
# description: Optics slits 2 X-center
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X10DA-OP-SL2:CENTERX
# onFailure: retry
# enabled: true
# softwareTrigger: false
# deviceTags:
# - optics
# - slits
# sl2_gapx:
# readoutPriority: baseline
# description: Optics slits 2 X-gap
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X10DA-OP-SL2:GAPX
# onFailure: retry
# enabled: true
# softwareTrigger: false
# deviceTags:
# - optics
# - slits
# sl2_centery:
# readoutPriority: baseline
# description: Optics slits 2 Y-center
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X10DA-OP-SL2:CENTERY
# onFailure: retry
# enabled: true
# softwareTrigger: false
# deviceTags:
# - optics
# - slits
# sl2_gapy:
# readoutPriority: baseline
# description: Optics slits 2 Y-gap
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X10DA-OP-SL2:GAPY
# onFailure: retry
# enabled: true
# softwareTrigger: false
# deviceTags:
# - optics
# - slits
@@ -0,0 +1,89 @@
###################################
## General ##
###################################
## SLS Machine
machine_config:
- !include ./x10da_machine.yaml
## Beam Monitors OP + EH
# beam_monitors_config:
# - !include ./x10da_beam_monitors.yaml
###################################
## Frontend ##
###################################
## Frontend
frontend_config:
- !include ./x10da_frontend.yaml
###################################
## Optics Hutch ##
###################################
## Bragg Monochromator
mo1_gonio:
readoutPriority: baseline
description: Monochromator ROTX Goniometer
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-OP1-MO1:ROTX
onFailure: retry
enabled: true
softwareTrigger: false
mo1_bragg:
readoutPriority: baseline
description: Positioner for the Monochromator
deviceClass: superxas_bec.devices.mo1_bragg.mo1_bragg.Mo1Bragg
deviceConfig:
prefix: "X10DA-OP-MO1:BRAGG:"
onFailure: retry
enabled: true
softwareTrigger: false
# mo1_bragg_angle:
# readoutPriority: baseline
# description: Positioner for the Monochromator
# deviceClass: superxas_bec.devices.mo1_bragg.mo1_bragg_angle.Mo1BraggAngle
# deviceConfig:
# prefix: "X10DA-OP-MO1:BRAGG:"
# onFailure: retry
# enabled: true
# softwareTrigger: false
# Remaining optics hutch
optics_config:
- !include ./x10da_optics.yaml
###################################
## Experimental Hutch ##
###################################
# ## NIDAQ
nidaq:
readoutPriority: monitored
description: NIDAQ backend for data reading for superxas scans
deviceClass: superxas_bec.devices.nidaq.nidaq.Nidaq
deviceConfig:
prefix: "X10DA-CPCL-SCANSERVER:"
onFailure: retry
enabled: true
softwareTrigger: false
## XAS (ICx, SDD, ref foils)
# xas_config:
# - !include ./x10da_xas.yaml
## XRD (Pilatus, pinhole, beamstop)
#xrd_config:
# - !include ./x10da_xrd.yaml
# Commented out because too slow
## Hutch cameras
# hutch_cams:
# - !include ./x10da_hutch_cameras.yaml
## Remaining experimental hutch
# es_config:
# - !include ./x10da_experimental_hutch.yaml
+72
View File
@@ -0,0 +1,72 @@
"""Frontend Absorber"""
from __future__ import annotations
import enum
from typing import TYPE_CHECKING
from ophyd import Component as Cpt
from ophyd import EpicsSignal, EpicsSignalRO
from ophyd_devices import CompareStatus, DeviceStatus
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
if TYPE_CHECKING:
from bec_lib.devicemanager import ScanInfo
class AbsorberError(Exception):
"""Absorber specific exception"""
class STATUS(int, enum.Enum):
"""Absorber States"""
MOVING_CLOSE = 0
OPEN = 1
MOVING_OPEN = 2
CLOSED = 3
NOT_ENABLED = 4
TIMEOUT_CLOSE = 5
TIMEOUT_OPEN = 6
CLOSE_LS_LOST = 7
OPEN_LS_LOST = 8
CLOSE_LS_NOT_FREE = 9
OPEN_LS_NOT_FREE = 10
ERROR_LS = 11
TO_CONNECT = 12
MAN_OPEN = 13
UNDEFINED = 14
class Absorber(PSIDeviceBase):
"""Class for the Frontend Absorber"""
USER_ACCESS = ["open", "close"]
request = Cpt(EpicsSignal, suffix="REQUEST", kind="config", doc="Open/Close Absorber")
status = Cpt(EpicsSignalRO, suffix="STATUS", kind="config", doc="Absorber Status")
status_string = Cpt(EpicsSignalRO, suffix="STATUS", kind="config", string=True, doc="Absorber Status")
def __init__(self, *, name: str, prefix: str = "", scan_info: ScanInfo | None = None, **kwargs):
super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs)
self.timeout_for_move = 10
# Wait for connection on all components, ensure IOC is connected
self.wait_for_connection(all_signals=True, timeout=5)
def open(self) -> DeviceStatus | None:
"""Open the Absorber"""
if self.status.get() == STATUS.CLOSED:
self.request.put(1)
status_open = CompareStatus(self.status, STATUS.OPEN, timeout=self.timeout_for_move)
status = status_open
return status
else:
return None
def close(self) -> DeviceStatus | None:
"""Close the Absorber"""
if self.status.get() == STATUS.OPEN:
self.request.put(1)
status_close = CompareStatus(self.status, STATUS.CLOSED, timeout=self.timeout_for_move)
status = status_close
return status
else:
return None
+466
View File
@@ -0,0 +1,466 @@
"""Module for the Mo1 Bragg positioner of the SuperXAS beamline.
The softIOC is reachable via the EPICS prefix X10DA-OP-MO1:BRAGG: and connected
to a motor controller via web sockets. The Mo1 Bragg positioner is a scan controller
to setup XAS scans. A few scan modes are programmed in the controller, e.g. simple and advanced XAS scans.
Note: For some of the Epics PVs, in particular action buttons, the put_complete=True is
used to ensure that the action is executed completely. This is believed
to allow for a more stable execution of the action."""
import time
from typing import Any, Literal
from bec_lib.devicemanager import ScanInfo
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd import DeviceStatus, Signal, StatusBase
from ophyd.status import SubscriptionStatus, WaitTimeoutError
from ophyd_devices import CompareStatus, ProgressSignal, TransitionStatus
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from ophyd_devices.utils.errors import DeviceStopError
from pydantic import BaseModel, Field
from typeguard import typechecked
from superxas_bec.devices.mo1_bragg.mo1_bragg_devices import Mo1BraggPositioner
# pylint: disable=unused-import
from superxas_bec.devices.mo1_bragg.mo1_bragg_enums import (
MoveType,
ScanControlLoadMessage,
ScanControlMode,
ScanControlScanStatus,
TriggerControlMode,
TriggerControlSource,
)
from superxas_bec.devices.mo1_bragg.mo1_bragg_utils import compute_spline
# Initialise logger
logger = bec_logger.logger
########### Exceptions ###########
class Mo1BraggError(Exception):
"""Exception for the Mo1 Bragg positioner"""
########## Scan Parameter Model ##########
class ScanParameter(BaseModel):
"""Dataclass to store the scan parameters for the Mo1 Bragg positioner.
This needs to be in sync with the kwargs of the MO1 Bragg scans from SuperXAS, to
ensure that the scan parameters are correctly set. Any changes in the scan kwargs,
i.e. renaming or adding new parameters, need to be represented here as well."""
scan_time: float | None = Field(None, description="Scan time for a half oscillation")
scan_duration: float | None = Field(None, description="Duration of the scan")
xrd_enable_low: bool | None = Field(
None, description="XRD enabled for low, should be PV trig_ena_lo_enum"
) # trig_enable_low: bool = None
xrd_enable_high: bool | None = Field(
None, description="XRD enabled for high, should be PV trig_ena_hi_enum"
) # trig_enable_high: bool = None
exp_time_low: float | None = Field(None, description="Exposure time low energy/angle")
exp_time_high: float | None = Field(None, description="Exposure time high energy/angle")
cycle_low: int | None = Field(None, description="Cycle for low energy/angle")
cycle_high: int | None = Field(None, description="Cycle for high energy/angle")
start: float | None = Field(None, description="Start value for energy/angle")
stop: float | None = Field(None, description="Stop value for energy/angle")
p_kink: float | None = Field(None, description="P Kink")
e_kink: float | None = Field(None, description="Energy Kink")
model_config: dict = {"validate_assignment": True}
########### Mo1 Bragg Motor Class ###########
class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
"""Mo1 Bragg motor for the SuperXAS beamline.
The prefix to connect to the soft IOC is X10DA-OP-MO1:BRAGG:
"""
progress_signal = Cpt(ProgressSignal, name="progress_signal")
USER_ACCESS = ["set_advanced_xas_settings", "set_xtal"]
def __init__(self, name: str, prefix: str = "", scan_info: ScanInfo | None = None, **kwargs): # type: ignore
"""
Initialize the PSI Device Base class.
Args:
name (str) : Name of the device
scan_info (ScanInfo): The scan info to use.
"""
super().__init__(name=name, scan_info=scan_info, prefix=prefix, **kwargs)
self.scan_parameter = ScanParameter()
self.timeout_for_pvwait = 7.5
########################################
# Beamline Specific Implementations #
########################################
def on_init(self) -> None:
"""
Called when the device is initialized.
No signals are connected at this point. If you like to
set default values on signals, please use on_connected instead.
"""
def on_connected(self) -> None:
"""
Called after the device is connected and its signals are connected.
Default values for signals should be set here.
"""
self.scan_control.scan_progress.subscribe(self._progress_update, run=False)
def on_stage(self) -> DeviceStatus | StatusBase | None:
"""
Called while staging the device.
Information about the upcoming scan can be accessed from the scan_info (self.scan_info.msg) object.
"""
if self.scan_control.scan_msg.get() != ScanControlLoadMessage.PENDING:
status = CompareStatus(self.scan_control.scan_msg, ScanControlLoadMessage.PENDING)
self.cancel_on_stop(status)
self.scan_control.scan_val_reset.put(1)
status.wait(timeout=self.timeout_for_pvwait)
scan_name = self.scan_info.msg.scan_name
self._update_scan_parameter()
if scan_name == "xas_simple_scan":
self.set_xas_settings(
low=self.scan_parameter.start,
high=self.scan_parameter.stop,
scan_time=self.scan_parameter.scan_time,
)
self.set_trig_settings(
enable_low=False,
enable_high=False,
exp_time_low=0,
exp_time_high=0,
cycle_low=0,
cycle_high=0,
)
self.set_scan_control_settings(
mode=ScanControlMode.SIMPLE, scan_duration=self.scan_parameter.scan_duration
)
elif scan_name == "xas_simple_scan_with_xrd":
self.set_xas_settings(
low=self.scan_parameter.start,
high=self.scan_parameter.stop,
scan_time=self.scan_parameter.scan_time,
)
self.set_trig_settings(
enable_low=self.scan_parameter.xrd_enable_low, # enable_low=self.scan_parameter.trig_enable_low,
enable_high=self.scan_parameter.xrd_enable_high, # enable_high=self.scan_parameter.trig_enable_high,
exp_time_low=self.scan_parameter.exp_time_low,
exp_time_high=self.scan_parameter.exp_time_high,
cycle_low=self.scan_parameter.cycle_low,
cycle_high=self.scan_parameter.cycle_high,
)
self.set_scan_control_settings(
mode=ScanControlMode.SIMPLE, scan_duration=self.scan_parameter.scan_duration
)
elif scan_name == "xas_advanced_scan":
self.set_advanced_xas_settings(
low=self.scan_parameter.start,
high=self.scan_parameter.stop,
scan_time=self.scan_parameter.scan_time,
p_kink=self.scan_parameter.p_kink,
e_kink=self.scan_parameter.e_kink,
)
self.set_trig_settings(
enable_low=False,
enable_high=False,
exp_time_low=0,
exp_time_high=0,
cycle_low=0,
cycle_high=0,
)
self.set_scan_control_settings(
mode=ScanControlMode.ADVANCED, scan_duration=self.scan_parameter.scan_duration
)
elif scan_name == "xas_advanced_scan_with_xrd":
self.set_advanced_xas_settings(
low=self.scan_parameter.start,
high=self.scan_parameter.stop,
scan_time=self.scan_parameter.scan_time,
p_kink=self.scan_parameter.p_kink,
e_kink=self.scan_parameter.e_kink,
)
self.set_trig_settings(
enable_low=self.scan_parameter.xrd_enable_low, # enable_low=self.scan_parameter.trig_enable_low,
enable_high=self.scan_parameter.xrd_enable_high, # enable_high=self.scan_parameter.trig_enable_high,
exp_time_low=self.scan_parameter.exp_time_low,
exp_time_high=self.scan_parameter.exp_time_high,
cycle_low=self.scan_parameter.cycle_low,
cycle_high=self.scan_parameter.cycle_high,
)
self.set_scan_control_settings(
mode=ScanControlMode.ADVANCED, scan_duration=self.scan_parameter.scan_duration
)
else:
return
# Setting scan duration seems to lag behind slightly in the backend, include small sleep
# logger.info(f"Sleeping for one second")
# time.sleep(1)
# logger.info(f"Device {self.name}, done sleeping")
# Load the scan parameters to the controller
status = CompareStatus(self.scan_control.scan_msg, ScanControlLoadMessage.SUCCESS)
self.cancel_on_stop(status)
self.scan_control.scan_load.put(1)
# Wait for params to be checked from controller
status.wait(self.timeout_for_pvwait)
return None
def on_unstage(self) -> DeviceStatus | StatusBase | None:
"""Called while unstaging the device."""
if self.stopped is True:
logger.warning(f"Resetting stopped in unstage for device {self.name}.")
self._stopped = False
if self.scan_control.scan_msg.get() in [
ScanControlLoadMessage.STARTED,
ScanControlLoadMessage.SUCCESS,
]:
status = CompareStatus(self.scan_control.scan_msg, ScanControlLoadMessage.PENDING)
self.cancel_on_stop(status)
try:
status.wait(2)
return None
except WaitTimeoutError:
logger.warning(
f"Timeout in on_unstage of {self.name} after {self.timeout_for_pvwait}s, current scan_control_message : {self.scan_control.scan_msg.get()}"
)
status = CompareStatus(self.scan_control.scan_msg, ScanControlLoadMessage.PENDING)
self.cancel_on_stop(status)
self.scan_control.scan_val_reset.put(1)
status.wait(timeout=self.timeout_for_pvwait)
else:
status = CompareStatus(self.scan_control.scan_msg, ScanControlLoadMessage.PENDING)
self.cancel_on_stop(status)
self.scan_control.scan_val_reset.put(1)
status.wait(timeout=self.timeout_for_pvwait)
return None
def on_pre_scan(self) -> DeviceStatus | StatusBase | None:
"""Called right before the scan starts on all devices automatically."""
def on_trigger(self) -> DeviceStatus | StatusBase | None:
"""Called when the device is triggered."""
def on_complete(self) -> DeviceStatus | StatusBase | None:
"""Called to inquire if a device has completed a scans."""
status = CompareStatus(self.scan_control.scan_done, 1)
self.cancel_on_stop(status)
return status
def on_kickoff(self) -> DeviceStatus | StatusBase | None:
"""Called to kickoff a device for a fly scan. Has to be called explicitly."""
scan_duration = self.scan_control.scan_duration.get()
# TODO implement better logic for infinite scans, at least bring it up with SuperXAS
start_func = (
self.scan_control.scan_start_infinite.put
if scan_duration < 0.1
else self.scan_control.scan_start_timer.put
)
status = TransitionStatus(
self.scan_control.scan_status,
transitions=[ScanControlScanStatus.READY, ScanControlScanStatus.RUNNING],
strict=True,
failure_states=[ScanControlScanStatus.PARAMETER_WRONG],
)
self.cancel_on_stop(status)
start_func(1)
return status
def on_stop(self) -> None:
"""Called when the device is stopped."""
self.stopped = True # Needs to be set to stop motion
######### Utility Methods #########
def _progress_update(self, value, **kwargs) -> None:
"""Callback method to update the scan progress, runs a callback
to SUB_PROGRESS subscribers, i.e. BEC.
Args:
value (int) : current progress value
"""
max_value = 100
self.progress_signal.put(value=value, max_value=max_value, done=bool(max_value == value))
def set_xas_settings(self, low: float, high: float, scan_time: float) -> None:
"""Set XAS parameters for upcoming scan.
Args:
low (float): Low energy/angle value of the scan
high (float): High energy/angle value of the scan
scan_time (float): Time for a half oscillation
"""
status_list = []
status_list.append(self.scan_settings.s_scan_energy_lo.set(low))
self.cancel_on_stop(status_list[-1])
status_list.append(self.scan_settings.s_scan_energy_hi.set(high))
self.cancel_on_stop(status_list[-1])
status_list.append(self.scan_settings.s_scan_scantime.set(scan_time))
self.cancel_on_stop(status_list[-1])
for s in status_list:
s.wait(timeout=self.timeout_for_pvwait)
@typechecked
def convert_angle_energy(
self, mode: Literal["AngleToEnergy", "EnergyToAngle"], inp: float
) -> float:
"""Calculate energy to angle or vice versa
Args:
mode (Literal["AngleToEnergy", "EnergyToAngle"]): Mode of calculation
input (float): Either angle or energy
Returns:
output (float): Converted angle or energy
"""
self.calculator.calc_reset.put(0)
self.calculator.calc_reset.put(1)
status = CompareStatus(self.calculator.calc_done, 0)
self.cancel_on_stop(status)
status.wait(self.timeout_for_pvwait)
if mode == "AngleToEnergy":
self.calculator.calc_angle.put(inp)
elif mode == "EnergyToAngle":
self.calculator.calc_energy.put(inp)
status = CompareStatus(self.calculator.calc_done, 1)
self.cancel_on_stop(status)
status.wait(self.timeout_for_pvwait)
time.sleep(0.25) #TODO needed still? Needed due to update frequency of softIOC
if mode == "AngleToEnergy":
return self.calculator.calc_energy.get()
elif mode == "EnergyToAngle":
return self.calculator.calc_angle.get()
def set_advanced_xas_settings(
self, low: float, high: float, scan_time: float, p_kink: float, e_kink: float
) -> None:
"""Set Advanced XAS parameters for upcoming scan.
Args:
low (float): Low angle value of the scan in eV
high (float): High angle value of the scan in eV
scan_time (float): Time for a half oscillation in s
p_kink (float): Position of kink in %
e_kink (float): Energy of kink in eV
"""
e_kink_deg = self.convert_angle_energy(mode="EnergyToAngle", inp=e_kink)
# Angle and Energy are inverse proportional!
high_deg = self.convert_angle_energy(mode="EnergyToAngle", inp=low)
low_deg = self.convert_angle_energy(mode="EnergyToAngle", inp=high)
pos, vel, dt = compute_spline(
low_deg=low_deg,
high_deg=high_deg,
p_kink=p_kink,
e_kink_deg=e_kink_deg,
scan_time=scan_time,
)
status_list = []
status_list.append(self.scan_settings.a_scan_pos.set(pos))
self.cancel_on_stop(status_list[-1])
status_list.append(self.scan_settings.a_scan_vel.set(vel))
self.cancel_on_stop(status_list[-1])
status_list.append(self.scan_settings.a_scan_time.set(dt))
self.cancel_on_stop(status_list[-1])
for s in status_list:
s.wait(timeout=self.timeout_for_pvwait)
def set_trig_settings(
self,
enable_low: bool,
enable_high: bool,
exp_time_low: int,
exp_time_high: int,
cycle_low: int,
cycle_high: int,
) -> None:
"""Set TRIG settings for the upcoming scan.
Args:
enable_low (bool): Enable TRIG for low energy/angle
enable_high (bool): Enable TRIG for high energy/angle
num_trigger_low (int): Number of triggers for low energy/angle
num_trigger_high (int): Number of triggers for high energy/angle
exp_time_low (int): Exposure time for low energy/angle
exp_time_high (int): Exposure time for high energy/angle
cycle_low (int): Cycle for low energy/angle
cycle_high (int): Cycle for high energy/angle
"""
status_list = []
status_list.append(self.scan_settings.trig_ena_hi_enum.set(int(enable_high)))
self.cancel_on_stop(status_list[-1])
status_list.append(self.scan_settings.trig_ena_lo_enum.set(int(enable_low)))
self.cancel_on_stop(status_list[-1])
status_list.append(self.scan_settings.trig_time_hi.set(exp_time_high))
self.cancel_on_stop(status_list[-1])
status_list.append(self.scan_settings.trig_time_lo.set(exp_time_low))
self.cancel_on_stop(status_list[-1])
status_list.append(self.scan_settings.trig_every_n_hi.set(cycle_high))
self.cancel_on_stop(status_list[-1])
status_list.append(self.scan_settings.trig_every_n_lo.set(cycle_low))
self.cancel_on_stop(status_list[-1])
for s in status_list:
s.wait(timeout=self.timeout_for_pvwait)
def set_scan_control_settings(self, mode: ScanControlMode, scan_duration: float) -> None:
"""Set the scan control settings for the upcoming scan.
Args:
mode (ScanControlMode): Mode for the scan, either simple or advanced
scan_duration (float): Duration of the scan
"""
val = ScanControlMode(mode).value
status_list = []
status_list.append(self.scan_control.scan_mode_enum.set(val))
self.cancel_on_stop(status_list[-1])
status_list.append(self.scan_control.scan_duration.set(scan_duration))
self.cancel_on_stop(status_list[-1])
for s in status_list:
s.wait(timeout=self.timeout_for_pvwait)
def _update_scan_parameter(self):
"""Get the scan_info parameters for the scan."""
for key, value in self.scan_info.msg.request_inputs["inputs"].items():
if hasattr(self.scan_parameter, key):
setattr(self.scan_parameter, key, value)
for key, value in self.scan_info.msg.request_inputs["kwargs"].items():
if hasattr(self.scan_parameter, key):
setattr(self.scan_parameter, key, value)
@@ -0,0 +1,20 @@
"""Positioner implementation with readback angle of the MO1 Bragg positioner."""
from ophyd import Component as Cpt
from ophyd import EpicsSignalRO, EpicsSignalWithRBV
from superxas_bec.devices.mo1_bragg.mo1_bragg_devices import Mo1BraggPositioner
class Mo1BraggAngle(Mo1BraggPositioner):
"""Positioner implementation with readback angle of the MO1 Bragg positioner."""
readback = Cpt(EpicsSignalRO, suffix="feedback_pos_angle_RBV", kind="normal", auto_monitor=True)
setpoint = Cpt(EpicsSignalWithRBV, suffix="set_abs_pos_angle", kind="normal", auto_monitor=True)
low_lim = Cpt(EpicsSignalRO, suffix="lo_lim_pos_angle_RBV", kind="config", auto_monitor=True)
high_lim = Cpt(EpicsSignalRO, suffix="hi_lim_pos_angle_RBV", kind="config", auto_monitor=True)
@property
def egu(self) -> str:
"""Return the engineering unit of the positioner."""
return "deg"
@@ -0,0 +1,407 @@
"""Module for the Mo1 Bragg positioner"""
import threading
import time
import traceback
from typing import Literal
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd import (
Device,
DeviceStatus,
EpicsSignal,
EpicsSignalRO,
EpicsSignalWithRBV,
PositionerBase,
Signal,
)
from ophyd.utils import LimitError
from superxas_bec.devices.mo1_bragg.mo1_bragg_enums import MoveType
# Initialise logger
logger = bec_logger.logger
############# Exceptions #############
class Mo1BraggStoppedError(Exception):
"""Exception to raise when the Bragg positioner is stopped."""
############# Signal classes #############
class MoveTypeSignal(Signal):
"""Custom Signal to set the move type of the Bragg positioner"""
# pylint: disable=arguments-differ
def set(self, value: str | MoveType) -> None:
"""Returns currently active move method
Args:
value (str | MoveType) : Can be either 'energy' or 'angle'
"""
value = MoveType(value.lower())
self._readback = value.value
############# Utility devices to separate the namespace #############
class Mo1BraggStatus(Device):
"""Mo1 Bragg PVs for status monitoring"""
error_status = Cpt(EpicsSignalRO, suffix="error_status_RBV", kind="config", auto_monitor=True)
brake_enabled = Cpt(EpicsSignalRO, suffix="brake_enabled_RBV", kind="config", auto_monitor=True)
mot_commutated = Cpt(
EpicsSignalRO, suffix="mot_commutated_RBV", kind="config", auto_monitor=True
)
axis_enabled = Cpt(EpicsSignalRO, suffix="axis_enabled_RBV", kind="config", auto_monitor=True)
heartbeat = Cpt(EpicsSignalRO, suffix="heartbeat_RBV", kind="config", auto_monitor=True)
class Mo1BraggCrystal(Device):
"""Mo1 Bragg PVs to set the crystal parameters"""
bragg_off_si111 = Cpt(EpicsSignalWithRBV, suffix="bragg_off_si111", kind="config")
bragg_off_si311 = Cpt(EpicsSignalWithRBV, suffix="bragg_off_si311", kind="config")
xtal_enum = Cpt(EpicsSignalWithRBV, suffix="xtal_ENUM", kind="config")
d_spacing_si111 = Cpt(EpicsSignalWithRBV, suffix="d_spacing_si111", kind="config")
d_spacing_si311 = Cpt(EpicsSignalWithRBV, suffix="d_spacing_si311", kind="config")
set_offset = Cpt(EpicsSignal, suffix="set_offset", kind="config", put_complete=True)
current_d_spacing = Cpt(
EpicsSignalRO, suffix="current_d_spacing_RBV", kind="normal", auto_monitor=True
)
current_bragg_off = Cpt(
EpicsSignalRO, suffix="current_bragg_off_RBV", kind="normal", auto_monitor=True
)
current_xtal = Cpt(
EpicsSignalRO, suffix="current_xtal_ENUM_RBV", kind="normal", auto_monitor=True
)
current_xtal_string = Cpt(
EpicsSignalRO, suffix="current_xtal_ENUM_RBV", kind="normal", auto_monitor=True, string=True
)
class Mo1BraggScanSettings(Device):
"""Mo1 Bragg PVs to set the scan setttings"""
# TRIG settings
trig_select_ref_enum = Cpt(EpicsSignalWithRBV, suffix="trig_select_ref_ENUM", kind="config")
trig_ena_hi_enum = Cpt(EpicsSignalWithRBV, suffix="trig_ena_hi_ENUM", kind="config")
trig_time_hi = Cpt(EpicsSignalWithRBV, suffix="trig_time_hi", kind="config")
trig_every_n_hi = Cpt(EpicsSignalWithRBV, suffix="trig_every_n_hi", kind="config")
trig_ena_lo_enum = Cpt(EpicsSignalWithRBV, suffix="trig_ena_lo_ENUM", kind="config")
trig_time_lo = Cpt(EpicsSignalWithRBV, suffix="trig_time_lo", kind="config")
trig_every_n_lo = Cpt(EpicsSignalWithRBV, suffix="trig_every_n_lo", kind="config")
# XAS simple scan settings
s_scan_angle_hi = Cpt(EpicsSignalWithRBV, suffix="s_scan_angle_hi", kind="config")
s_scan_angle_lo = Cpt(EpicsSignalWithRBV, suffix="s_scan_angle_lo", kind="config")
s_scan_energy_lo = Cpt(
EpicsSignalWithRBV, suffix="s_scan_energy_lo", kind="config", auto_monitor=True
)
s_scan_energy_hi = Cpt(
EpicsSignalWithRBV, suffix="s_scan_energy_hi", kind="config", auto_monitor=True
)
s_scan_scantime = Cpt(
EpicsSignalWithRBV, suffix="s_scan_scantime", kind="config", auto_monitor=True
)
# XAS advanced scan settings
a_scan_pos = Cpt(EpicsSignalWithRBV, suffix="a_scan_pos", kind="config", auto_monitor=True)
a_scan_vel = Cpt(EpicsSignalWithRBV, suffix="a_scan_vel", kind="config", auto_monitor=True)
a_scan_time = Cpt(EpicsSignalWithRBV, suffix="a_scan_time", kind="config", auto_monitor=True)
class Mo1TriggerSettings(Device):
"""Mo1 Trigger settings"""
settle_time = Cpt(EpicsSignalWithRBV, suffix="settle_time", kind="config")
max_dev = Cpt(EpicsSignalWithRBV, suffix="max_dev", kind="config")
xrd_trig_src_enum = Cpt(EpicsSignalWithRBV, suffix="xrd_trig_src_ENUM", kind="config")
xrd_trig_mode_enum = Cpt(EpicsSignalWithRBV, suffix="xrd_trig_mode_ENUM", kind="config")
xrd_trig_len = Cpt(EpicsSignalWithRBV, suffix="xrd_trig_len", kind="config")
xrd_trig_req = Cpt(EpicsSignal, suffix="xrd_trig_req", kind="config")
falcon_trig_src_enum = Cpt(EpicsSignalWithRBV, suffix="falcon_trig_src_ENUM", kind="config")
falcon_trig_mode_enum = Cpt(EpicsSignalWithRBV, suffix="falcon_trig_mode_ENUM", kind="config")
falcon_trig_len = Cpt(EpicsSignalWithRBV, suffix="falcon_trig_len", kind="config")
falcon_trig_req = Cpt(EpicsSignal, suffix="falcon_trig_req", kind="config")
univ1_trig_src_enum = Cpt(EpicsSignalWithRBV, suffix="univ1_trig_src_ENUM", kind="config")
univ1_trig_mode_enum = Cpt(EpicsSignalWithRBV, suffix="univ1_trig_mode_ENUM", kind="config")
univ1_trig_len = Cpt(EpicsSignalWithRBV, suffix="univ1_trig_len", kind="config")
univ1_trig_req = Cpt(EpicsSignal, suffix="univ1_trig_req", kind="config")
univ2_trig_src_enum = Cpt(EpicsSignalWithRBV, suffix="univ2_trig_src_ENUM", kind="config")
univ2_trig_mode_enum = Cpt(EpicsSignalWithRBV, suffix="univ2_trig_mode_ENUM", kind="config")
univ2_trig_len = Cpt(EpicsSignalWithRBV, suffix="univ2_trig_len", kind="config")
univ2_trig_req = Cpt(EpicsSignal, suffix="univ2_trig_req", kind="config")
class Mo1BraggCalculator(Device):
"""Mo1 Bragg PVs to convert angle to energy or vice-versa."""
calc_reset = Cpt(EpicsSignal, suffix="calc_reset", kind="config", put_complete=True)
calc_done = Cpt(EpicsSignalRO, suffix="calc_done_RBV", kind="config")
calc_energy = Cpt(EpicsSignalWithRBV, suffix="calc_energy", kind="config")
calc_angle = Cpt(EpicsSignalWithRBV, suffix="calc_angle", kind="config")
class Mo1BraggScanControl(Device):
"""Mo1 Bragg PVs to control the scan after setting the parameters."""
scan_mode_enum = Cpt(EpicsSignalWithRBV, suffix="scan_mode_ENUM", kind="config")
scan_duration = Cpt(
EpicsSignalWithRBV, suffix="scan_duration", kind="config", auto_monitor=True
)
scan_load = Cpt(EpicsSignal, suffix="scan_load", kind="config", put_complete=True)
scan_msg = Cpt(EpicsSignalRO, suffix="scan_msg_ENUM_RBV", kind="config", auto_monitor=True)
scan_start_infinite = Cpt(
EpicsSignal, suffix="scan_start_infinite", kind="config", put_complete=True
)
scan_start_timer = Cpt(EpicsSignal, suffix="scan_start_timer", kind="config", put_complete=True)
scan_stop = Cpt(EpicsSignal, suffix="scan_stop", kind="config", put_complete=True)
scan_status = Cpt(
EpicsSignalRO, suffix="scan_status_ENUM_RBV", kind="config", auto_monitor=True
)
scan_time_left = Cpt(
EpicsSignalRO, suffix="scan_time_left_RBV", kind="config", auto_monitor=True
)
scan_done = Cpt(EpicsSignalRO, suffix="scan_done_RBV", kind="config", auto_monitor=True)
scan_val_reset = Cpt(EpicsSignal, suffix="scan_val_reset", kind="config", put_complete=True)
scan_progress = Cpt(EpicsSignalRO, suffix="scan_progress_RBV", kind="config", auto_monitor=True)
scan_spectra_done = Cpt(
EpicsSignalRO, suffix="scan_n_osc_RBV", kind="config", auto_monitor=True
)
scan_spectra_left = Cpt(
EpicsSignalRO, suffix="scan_n_osc_left_RBV", kind="config", auto_monitor=True
)
class Mo1BraggPositioner(Device, PositionerBase):
"""
Positioner implementation with readback energy of the MO1 Bragg positioner.
The prefix to connect to the soft IOC is X10DA-OP-MO1:BRAGG:
This soft IOC connects to the NI motor and its control loop.
"""
USER_ACCESS = ["set_xtal"]
####### Sub-components ########
# Namespace is cleaner and easier to maintain
crystal = Cpt(Mo1BraggCrystal, "")
scan_settings = Cpt(Mo1BraggScanSettings, "")
trigger_settings = Cpt(Mo1TriggerSettings, "")
calculator = Cpt(Mo1BraggCalculator, "")
scan_control = Cpt(Mo1BraggScanControl, "")
status = Cpt(Mo1BraggStatus, "")
############# Energy PVs #############
readback = Cpt(
EpicsSignalRO, suffix="feedback_pos_energy_RBV", kind="hinted", auto_monitor=True
)
setpoint = Cpt(
EpicsSignalWithRBV, suffix="set_abs_pos_energy", kind="normal", auto_monitor=True
)
motor_is_moving = Cpt(
EpicsSignalRO, suffix="move_abs_done_RBV", kind="normal", auto_monitor=True
)
low_lim = Cpt(EpicsSignalRO, suffix="lo_lim_pos_energy_RBV", kind="config", auto_monitor=True)
high_lim = Cpt(EpicsSignalRO, suffix="hi_lim_pos_energy_RBV", kind="config", auto_monitor=True)
velocity = Cpt(EpicsSignalWithRBV, suffix="move_velocity", kind="config", auto_monitor=True)
########## Move Command PVs ##########
move_abs = Cpt(EpicsSignal, suffix="move_abs", kind="config", put_complete=True)
move_stop = Cpt(EpicsSignal, suffix="move_stop", kind="config", put_complete=True)
SUB_READBACK = "readback"
_default_sub = SUB_READBACK
SUB_PROGRESS = "progress"
def __init__(self, prefix="", *, name: str, **kwargs):
"""Initialize the Mo1 Bragg positioner.
Args:
prefix (str): EPICS prefix for the device
name (str): Name of the device
kwargs: Additional keyword arguments
"""
super().__init__(prefix, name=name, **kwargs)
self._move_thread = None
self._stopped = False
self.readback.name = self.name
def stop(self, *, success=False) -> None:
"""Stop any motion on the positioner
Args:
success (bool) : Flag to indicate if the motion was successful
"""
self.move_stop.put(1)
self._stopped = True
super().stop(success=success)
def stop_scan(self) -> None:
"""Stop the currently running scan gracefully, this finishes the running oscillation."""
self.scan_control.scan_stop.put(1)
@property
def stopped(self) -> bool:
"""Return the status of the positioner"""
return self._stopped
######### Positioner specific methods #########
@property
def limits(self) -> tuple:
"""Return limits of the Bragg positioner"""
return (self.low_lim.get(), self.high_lim.get())
@property
def low_limit(self) -> float:
"""Return low limit of axis"""
return self.limits[0]
@property
def high_limit(self) -> float:
"""Return high limit of axis"""
return self.limits[1]
@property
def egu(self) -> str:
"""Return the engineering units of the positioner"""
return "eV"
@property
def position(self) -> float:
"""Return the current position of Mo1Bragg, considering the move type"""
return self.readback.get()
# pylint: disable=arguments-differ
def check_value(self, value: float) -> None:
"""Method to check if a value is within limits of the positioner.
Called by PositionerBase.move()
Args:
value (float) : value to move axis to.
"""
low_limit, high_limit = self.limits
if low_limit < high_limit and not low_limit <= value <= high_limit:
raise LimitError(f"position={value} not within limits {self.limits}")
def _move_and_finish(
self, target_pos: float, status: DeviceStatus, update_frequency: float = 0.1
) -> None:
"""
Method to be called in the move thread to move the Bragg positioner
to the target position.
Args:
target_pos (float) : target position for the motion
move_cpt (Cpt) : component to set the target position on the IOC,
either setpoint or setpoint_abs_angle depending
on the move type
read_cpt (Cpt) : component to read the current position of the motion,
readback or feedback_pos_angle
status (DeviceStatus) : status object to set the status of the motion
update_frequency (float): Optional, frequency to update the current position of
the motion, defaults to 0.1s
"""
motor_name = None
try:
# Set the target position on IOC
self.setpoint.put(target_pos)
self.move_abs.put(1)
# Currently sleep is needed due to delay in updates on PVs, maybe time can be reduced
time.sleep(0.5)
motor_name = self.name
while self.motor_is_moving.get() == 0:
if self.stopped:
raise Mo1BraggStoppedError(f"Device {self.name} was stopped")
time.sleep(update_frequency)
# pylint: disable=protected-access
status.set_finished()
# pylint: disable=broad-except
except Exception as exc:
content = traceback.format_exc()
logger.error(
f"Error in move thread of device {motor_name if motor_name else ''}: {content}"
)
status.set_exception(exc=exc)
def move(self, value: float, **kwargs) -> DeviceStatus:
"""
Move the Bragg positioner to the specified value, allows to
switch between move types angle and energy.
Args:
value (float) : target value for the motion
move_type (str | MoveType) : Optional, specify the type of move,
either 'energy' or 'angle'
Returns:
DeviceStatus : status object to track the motion
"""
self._stopped = False
self.check_value(value)
status = DeviceStatus(device=self)
self._move_thread = threading.Thread(
target=self._move_and_finish, args=(value, status, 0.1)
)
self._move_thread.start()
return status
# -------------- End of Positioner specific methods -----------------#
# -------------- MO1 Bragg specific methods -----------------#
def set_xtal(
self,
xtal_enum: Literal["111", "311"],
bragg_off_si111: float = None,
bragg_off_si311: float = None,
d_spacing_si111: float = None,
d_spacing_si311: float = None,
) -> None:
"""Method to set the crystal parameters of the Bragg positioner
Args:
xtal_enum (Literal["111", "311"]) : Enum to set the crystal orientation
bragg_off_si111 (float) : Offset for the 111 crystal
bragg_off_si311 (float) : Offset for the 311 crystal
d_spacing_si111 (float) : d-spacing for the 111 crystal
d_spacing_si311 (float) : d-spacing for the 311 crystal
"""
if bragg_off_si111 is not None:
self.crystal.bragg_off_si111.put(bragg_off_si111)
if bragg_off_si311 is not None:
self.crystal.bragg_off_si311.put(bragg_off_si311)
if d_spacing_si111 is not None:
self.crystal.d_spacing_si111.put(d_spacing_si111)
if d_spacing_si311 is not None:
self.crystal.d_spacing_si311.put(d_spacing_si311)
if xtal_enum == "111":
crystal_set = 0
elif xtal_enum == "311":
crystal_set = 1
else:
raise ValueError(
f"Invalid argument for xtal_enum : {xtal_enum}, choose from '111' or '311'"
)
self.crystal.xtal_enum.put(crystal_set)
self.crystal.set_offset.put(1)
@@ -0,0 +1,61 @@
"""Enums for the Bragg positioner and trigger generator"""
import enum
class TriggerControlSource(int, enum.Enum):
"""Enum class for the trigger control source of the trigger generator"""
EPICS = 0
INPOS = 1
class TriggerControlMode(int, enum.Enum):
"""Enum class for the trigger control mode of the trigger generator"""
PULSE = 0
CONDITION = 1
class ScanControlScanStatus(int, enum.Enum):
"""Enum class for the scan status of the Bragg positioner"""
PARAMETER_WRONG = 0
VALIDATION_PENDING = 1
READY = 2
RUNNING = 3
class ScanControlLoadMessage(int, enum.Enum):
"""Enum for validating messages for load message of the Bragg positioner"""
PENDING = 0
STARTED = 1
SUCCESS = 2
ERR_TRIG_MEAS_LEN_LOW = 3
ERR_TRIG_N_TRIGGERS_LOW = 4
ERR_TRIG_TRIGS_EVERY_N_LOW = 5
ERR_TRIG_MEAS_LEN_HI = 6
ERR_TRIG_N_TRIGGERS_HI = 7
ERR_TRIG_TRIGS_EVERY_N_HI = 8
ERR_SCAN_HI_ANGLE_LIMIT = 9
ERR_SCAN_LOW_ANGLE_LIMITS = 10
ERR_SCAN_TIME = 11
ERR_SCAN_VEL_TOO_HI = 12
ERR_SCAN_ANGLE_OUT_OF_LIM = 13
ERR_SCAN_HIGH_VEL_LAR_42 = 14
ERR_SCAN_MODE_INVALID = 15
class MoveType(str, enum.Enum):
"""Enum class to switch between move types energy and angle for the Bragg positioner"""
ENERGY = "energy"
ANGLE = "angle"
class ScanControlMode(int, enum.Enum):
"""Enum class for the scan control mode of the Bragg positioner"""
SIMPLE = 0
ADVANCED = 1
@@ -0,0 +1,93 @@
"""Module for additional utils of the Mo1 Bragg Positioner"""
import numpy as np
from scipy.interpolate import BSpline
################ Define Constants ############
SAFETY_FACTOR = 0.025 # safety factor to limit acceleration -> NEVER SET TO ZERO !
N_SAMPLES = 41 # number of samples to generate -> Always choose uneven number,
# otherwise peak value will not be included
DEGREE_SPLINE = 3 # DEGREE_SPLINE of spline, 3 works good
TIME_COMPENSATE_SPLINE = 0.0062 # time to be compensated each spline in s
POSITION_COMPONSATION = 0.02 # angle to add at both limits, must be same values
# as used on ACS controller for simple scans
class Mo1UtilsSplineError(Exception):
"""Exception for spline computation"""
def compute_spline(
low_deg: float, high_deg: float, p_kink: float, e_kink_deg: float, scan_time: float
) -> tuple[float, float, float]:
"""Spline computation for the advanced scan mode
Args:
low_deg (float): Low angle value of the scan in deg
high_deg (float): High angle value of the scan in deg
scan_time (float): Time for a half oscillation in s
p_kink (float): Position of kink in %
e_kink_deg (float): Position of kink in degree
Returns:
tuple[float,float,float] : Position, Velocity and delta T arrays for the spline
"""
# increase motion range slightly so that xas trigger signals will occur at defined energy limits
low_deg = low_deg - POSITION_COMPONSATION
high_deg = high_deg + POSITION_COMPONSATION
if not (0 <= p_kink <= 100):
raise Mo1UtilsSplineError(
"Kink position not within range of [0..100%]" + f"for p_kink: {p_kink}"
)
if not (low_deg < e_kink_deg < high_deg):
raise Mo1UtilsSplineError(
"Kink energy not within selected energy range of scan,"
+ f"for e_kink_deg {e_kink_deg}, low_deg {low_deg} and"
+ f"high_deg {high_deg}."
)
tc1 = SAFETY_FACTOR / scan_time * TIME_COMPENSATE_SPLINE
t_kink = (scan_time - TIME_COMPENSATE_SPLINE - 2 * (SAFETY_FACTOR - tc1)) * p_kink / 100 + (
SAFETY_FACTOR - tc1
)
t_input = [
0,
SAFETY_FACTOR - tc1,
t_kink,
scan_time - TIME_COMPENSATE_SPLINE - SAFETY_FACTOR + tc1,
scan_time - TIME_COMPENSATE_SPLINE,
]
p_input = [0, 0, e_kink_deg - low_deg, high_deg - low_deg, high_deg - low_deg]
cv = np.stack((t_input, p_input)).T # spline coefficients
max_param = len(cv) - DEGREE_SPLINE
kv = np.clip(np.arange(len(cv) + DEGREE_SPLINE + 1) - DEGREE_SPLINE, 0, max_param) # knots
spl = BSpline(kv, cv, DEGREE_SPLINE) # get spline function
p = spl(np.linspace(0, max_param, N_SAMPLES))
v = spl(np.linspace(0, max_param, N_SAMPLES), 1)
a = spl(np.linspace(0, max_param, N_SAMPLES), 2)
j = spl(np.linspace(0, max_param, N_SAMPLES), 3)
tim, pos = p.T
pos = pos + low_deg
vel = v[:, 1] / v[:, 0]
acc = []
for item in a:
acc.append(0) if item[1] == 0 else acc.append(item[1] / item[0])
jerk = []
for item in j:
jerk.append(0) if item[1] == 0 else jerk.append(item[1] / item[0])
dt = np.zeros(len(tim))
for i in np.arange(len(tim)):
if i == 0:
dt[i] = 0
else:
dt[i] = 1000 * (tim[i] - tim[i - 1])
return pos, vel, dt
+706
View File
@@ -0,0 +1,706 @@
from __future__ import annotations
import time
from typing import TYPE_CHECKING, Literal, cast
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd import Device, DeviceStatus, EpicsSignal, EpicsSignalRO, Kind, StatusBase
from ophyd.status import SubscriptionStatus, WaitTimeoutError
from ophyd_devices import CompareStatus, ProgressSignal, TransitionStatus
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from ophyd_devices.sim.sim_signals import SetableSignal
from superxas_bec.devices.nidaq.nidaq_enums import (
EncoderFactors,
NIDAQCompression,
NidaqState,
ReadoutRange,
ScanRates,
ScanType,
)
if TYPE_CHECKING: # pragma: no cover
from bec_lib.devicemanager import ScanInfo
logger = bec_logger.logger
class NidaqError(Exception):
"""Nidaq specific error"""
class NidaqControl(Device):
"""Nidaq control class with all PVs"""
energy = Cpt(SetableSignal, value=0, kind=Kind.normal)
smpl_abs = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream sample absorption"
)
ref_abs = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream reference absorption"
)
cisum = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter sum"
)
ai0_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 0, MEAN"
)
ai1_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 1, MEAN"
)
ai2_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 2, MEAN"
)
ai3_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 3, MEAN"
)
ai4_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 4, MEAN"
)
ai5_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 5, MEAN"
)
ai6_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 6, MEAN"
)
ai7_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 7, MEAN"
)
di0_max = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream digital input 0, MAX")
di1_max = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream digital input 1, MAX")
di2_max = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream digital input 2, MAX")
di3_max = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream digital input 3, MAX")
di4_max = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream digital input 4, MAX")
ci0_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 0, MEAN"
)
ci1_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 1, MEAN"
)
ci2_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 2, MEAN"
)
ci3_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 3, MEAN"
)
ci4_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 4, MEAN"
)
ci5_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 5, MEAN"
)
ci6_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 6, MEAN"
)
ci7_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 7, MEAN"
)
ci8_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 8, MEAN"
)
ci9_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 9, MEAN"
)
### Readback PVs for EpicsEmitter ###
ai0 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-AI0",
kind=Kind.normal,
doc="EPICS analog input 0",
auto_monitor=True,
)
ai1 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-AI1",
kind=Kind.normal,
doc="EPICS analog input 1",
auto_monitor=True,
)
ai2 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-AI2",
kind=Kind.normal,
doc="EPICS analog input 2",
auto_monitor=True,
)
ai3 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-AI3",
kind=Kind.normal,
doc="EPICS analog input 3",
auto_monitor=True,
)
ai4 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-AI4",
kind=Kind.normal,
doc="EPICS analog input 4",
auto_monitor=True,
)
ai5 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-AI5",
kind=Kind.normal,
doc="EPICS analog input 5",
auto_monitor=True,
)
ai6 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-AI6",
kind=Kind.normal,
doc="EPICS analog input 6",
auto_monitor=True,
)
ai7 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-AI7",
kind=Kind.normal,
doc="EPICS analog input 7",
auto_monitor=True,
)
ci0 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-CI0",
kind=Kind.normal,
doc="EPICS counter input 0",
auto_monitor=True,
)
ci1 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-CI1",
kind=Kind.normal,
doc="EPICS counter input 1",
auto_monitor=True,
)
ci2 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-CI2",
kind=Kind.normal,
doc="EPICS counter input 2",
auto_monitor=True,
)
ci3 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-CI3",
kind=Kind.normal,
doc="EPICS counter input 3",
auto_monitor=True,
)
ci4 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-CI4",
kind=Kind.normal,
doc="EPICS counter input 4",
auto_monitor=True,
)
ci5 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-CI5",
kind=Kind.normal,
doc="EPICS counter input 5",
auto_monitor=True,
)
ci6 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-CI6",
kind=Kind.normal,
doc="EPICS counter input 6",
auto_monitor=True,
)
ci7 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-CI7",
kind=Kind.normal,
doc="EPICS counter input 7",
auto_monitor=True,
)
ci8 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-CI8",
kind=Kind.normal,
doc="EPICS counter input 8",
auto_monitor=True,
)
ci9 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-CI9",
kind=Kind.normal,
doc="EPICS counter input 9",
auto_monitor=True,
)
di0 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-DI0",
kind=Kind.normal,
doc="EPICS digital input 0",
auto_monitor=True,
)
di1 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-DI1",
kind=Kind.normal,
doc="EPICS digital input 1",
auto_monitor=True,
)
di2 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-DI2",
kind=Kind.normal,
doc="EPICS digital input 2",
auto_monitor=True,
)
di3 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-DI3",
kind=Kind.normal,
doc="EPICS digital input 3",
auto_monitor=True,
)
di4 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-DI4",
kind=Kind.normal,
doc="EPICS digital input 4",
auto_monitor=True,
)
enc_epics = Cpt(
EpicsSignalRO,
suffix="NIDAQ-ENC",
kind=Kind.normal,
doc="EPICS Encoder reading",
auto_monitor=True,
)
energy_epics = Cpt(
EpicsSignalRO,
suffix="NIDAQ-ENERGY",
kind=Kind.normal,
doc="EPICS Energy reading",
auto_monitor=True,
)
### Readback for BEC emitter ###
ai0_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 0, STD"
)
ai1_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 1, STD"
)
ai2_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 2, STD"
)
ai3_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 3, STD"
)
ai4_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 4, STD"
)
ai5_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 5, STD"
)
ai6_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 6, STD"
)
ai7_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 7, STD"
)
ci0_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 0. STD"
)
ci1_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 1. STD"
)
ci2_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 2. STD"
)
ci3_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 3. STD"
)
ci4_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 4. STD"
)
ci5_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 5. STD"
)
ci6_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 6. STD"
)
ci7_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 7. STD"
)
ci8_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 8. STD"
)
ci9_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 9. STD"
)
xas_timestamp = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream XAS timestamp")
# xrd_timestamp = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream XRD timestamp")
# xrd_angle = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream XRD angle")
# xrd_energy = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream XRD energy")
# xrd_ai0_mean = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream XRD ai0 mean")
# xrd_ai0_std_dev = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream XRD ai0 std dev")
enc = Cpt(SetableSignal, value=0, kind=Kind.normal)
rle = Cpt(SetableSignal, value=0, kind=Kind.normal)
### Control PVs ###
enable_compression = Cpt(EpicsSignal, suffix="NIDAQ-EnableRLE", kind=Kind.config, auto_monitor=True)
# enable_dead_time_correction = Cpt(EpicsSignal, suffix="NIDAQ-EnableDTC", kind=Kind.config, auto_monitor=True)
kickoff_call = Cpt(EpicsSignal, suffix="NIDAQ-Kickoff", kind=Kind.config)
stage_call = Cpt(EpicsSignal, suffix="NIDAQ-Stage", kind=Kind.config)
state = Cpt(EpicsSignal, suffix="NIDAQ-FSMState", kind=Kind.config, auto_monitor=True)
server_status = Cpt(EpicsSignalRO, suffix="NIDAQ-ServerStatus", kind=Kind.config)
compression_ratio = Cpt(EpicsSignalRO, suffix="NIDAQ-CompressionRatio", kind=Kind.config)
scan_type = Cpt(EpicsSignal, suffix="NIDAQ-ScanType", kind=Kind.config)
scan_type_string = Cpt(EpicsSignal, suffix="NIDAQ-ScanType", kind=Kind.config, string=True)
sampling_rate = Cpt(EpicsSignal, suffix="NIDAQ-SamplingRateRequested", kind=Kind.config, auto_monitor=True)
sampling_rate_string = Cpt(EpicsSignal, suffix="NIDAQ-SamplingRateRequested", kind=Kind.config, string=True, auto_monitor=True)
scan_duration = Cpt(EpicsSignal, suffix="NIDAQ-SamplingDuration", kind=Kind.config)
readout_range = Cpt(EpicsSignal, suffix="NIDAQ-ReadoutRange", kind=Kind.config, auto_monitor=True)
readout_range_string = Cpt(EpicsSignal, suffix="NIDAQ-ReadoutRange", kind=Kind.config, string=True, auto_monitor=True)
encoder_factor = Cpt(EpicsSignal, suffix="NIDAQ-EncoderFactor", kind=Kind.config, auto_monitor=True)
encoder_factor_string = Cpt(EpicsSignal, suffix="NIDAQ-EncoderFactor", kind=Kind.config, string=True, auto_monitor=True)
stop_call = Cpt(EpicsSignal, suffix="NIDAQ-Stop", kind=Kind.config)
power = Cpt(EpicsSignal, suffix="NIDAQ-Power", kind=Kind.config)
heartbeat = Cpt(EpicsSignal, suffix="NIDAQ-Heartbeat", kind=Kind.config, auto_monitor=True)
time_left = Cpt(EpicsSignalRO, suffix="NIDAQ-TimeLeft", kind=Kind.config, auto_monitor=True)
ai_chans = Cpt(EpicsSignal, suffix="NIDAQ-AIChans", kind=Kind.config)
ci_chans = Cpt(EpicsSignal, suffix="NIDAQ-CIChans", kind=Kind.config)
di_chans = Cpt(EpicsSignal, suffix="NIDAQ-DIChans", kind=Kind.config)
add_chans = Cpt(EpicsSignal, suffix="NIDAQ-AddChans", kind=Kind.config, auto_monitor=True)
smpl_abs_ln = Cpt(EpicsSignal, suffix="NIDAQ-smpl_abs_ln", kind=Kind.config, auto_monitor=True)
ref_abs_ln = Cpt(EpicsSignal, suffix="NIDAQ-ref_abs_ln", kind=Kind.config, auto_monitor=True)
smpl_abs_no = Cpt(EpicsSignal, suffix="NIDAQ-smpl_abs_no", kind=Kind.config, auto_monitor=True)
smpl_abs_no_string = Cpt(EpicsSignal, suffix="NIDAQ-smpl_abs_no", kind=Kind.config, string=True, auto_monitor=True)
smpl_abs_de = Cpt(EpicsSignal, suffix="NIDAQ-smpl_abs_de", kind=Kind.config, auto_monitor=True)
smpl_abs_de_string = Cpt(EpicsSignal, suffix="NIDAQ-smpl_abs_de", kind=Kind.config, string=True, auto_monitor=True)
ref_abs_no = Cpt(EpicsSignal, suffix="NIDAQ-ref_abs_no", kind=Kind.config, auto_monitor=True)
ref_abs_no_string = Cpt(EpicsSignal, suffix="NIDAQ-ref_abs_no", kind=Kind.config, string=True, auto_monitor=True)
ref_abs_de = Cpt(EpicsSignal, suffix="NIDAQ-ref_abs_de", kind=Kind.config, auto_monitor=True)
ref_abs_de_string = Cpt(EpicsSignal, suffix="NIDAQ-ref_abs_de", kind=Kind.config, string=True, auto_monitor=True)
class Nidaq(PSIDeviceBase, NidaqControl):
"""NIDAQ ophyd wrapper around the NIDAQ backend currently running at x10da-nidaq-01
Args:
prefix (str) : Prefix to the NIDAQ soft ioc, currently X10DA-CPCL-SCANSERVER:
name (str) : Name of the device
scan_info (ScanInfo) : ScanInfo object passed by BEC's devicemanager.
"""
progress_signal = Cpt(ProgressSignal, name="progress_signal")
USER_ACCESS = ["set_config"]
def __init__(self, prefix: str = "", *, name: str, scan_info: ScanInfo = None, **kwargs):
super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs)
self.scan_info: ScanInfo
self.timeout_wait_for_signal = 5 # put 5s firsts
self._timeout_wait_for_pv = 5 # 5s timeout for pv calls. editted due to timeout issues persisting
self.valid_scan_names = [
"xas_simple_scan",
"xas_simple_scan_with_xrd",
"xas_advanced_scan",
"xas_advanced_scan_with_xrd",
"nidaq_continuous_scan",
]
########################################
# Beamline Methods #
########################################
def _check_if_scan_name_is_valid(self) -> bool:
"""Check if the scan is within the list of scans for which the backend is working"""
scan_name = self.scan_info.msg.scan_name
if scan_name in self.valid_scan_names:
return True
return False
def set_config(
self,
sampling_rate: Literal[
100000, 500000, 1000000, 2000000, 4000000, 5000000, 10000000, 14286000
],
ai: list,
ci: list,
di: list,
scan_type: Literal["continuous", "triggered"] = "triggered",
scan_duration: float = 0,
readout_range: Literal[1, 2, 5, 10] = 10,
encoder_type: Literal["X_1", "X_2", "X_4"] = "X_4",
enable_compression: bool = True,
) -> None:
"""Method to configure the NIDAQ
Args:
sampling_rate(Literal[100000, 500000, 1000000, 2000000, 4000000, 5000000,
10000000, 14286000]): Sampling rate in Hz
ai(list): List of analog input channel numbers to add, i.e. [0, 1, 2] for
input 0, 1 and 2
ci(list): List of counter input channel numbers to add, i.e. [0, 1, 2] for
input 0, 1 and 2
di(list): List of digital input channel numbers to add, i.e. [0, 1, 2] for
input 0, 1 and 2
scan_type(Literal['continuous', 'triggered']): Triggered to use with monochromator,
otherwise continuous, default 'triggered'
scan_duration(float): Scan duration in seconds, use 0 for infinite scan, default 0
readout_range(Literal[1, 2, 5, 10]): Readout range in +- Volts, default +-10V
encoder_type(Literal['X_1', 'X_2', 'X_4']): Encoder readout type, default 'X_4'
enable_compression(bool): Enable or disable compression of data, default True
"""
if sampling_rate == 100000:
self.sampling_rate.put(ScanRates.HUNDRED_KHZ)
elif sampling_rate == 500000:
self.sampling_rate.put(ScanRates.FIVE_HUNDRED_KHZ)
elif sampling_rate == 1000000:
self.sampling_rate.put(ScanRates.ONE_MHZ)
elif sampling_rate == 2000000:
self.sampling_rate.put(ScanRates.TWO_MHZ)
elif sampling_rate == 4000000:
self.sampling_rate.put(ScanRates.FOUR_MHZ)
elif sampling_rate == 5000000:
self.sampling_rate.put(ScanRates.FIVE_MHZ)
elif sampling_rate == 10000000:
self.sampling_rate.put(ScanRates.TEN_MHZ)
elif sampling_rate == 14286000:
self.sampling_rate.put(ScanRates.FOURTEEN_THREE_MHZ)
ai_chans = 0
if isinstance(ai, list):
for ch in ai:
if isinstance(ch, int):
if ch >= 0 and ch <= 7:
ai_chans = ai_chans | (1 << ch)
self.ai_chans.put(ai_chans)
ci_chans = 0
if isinstance(ci, list):
for ch in ci:
if isinstance(ch, int):
if ch >= 0 and ch <= 7:
ci_chans = ci_chans | (1 << ch)
self.ci_chans.put(ci_chans)
di_chans = 0
if isinstance(di, list):
for ch in di:
if isinstance(ch, int):
if ch >= 0 and ch <= 4:
di_chans = di_chans | (1 << ch)
self.di_chans.put(di_chans)
if scan_type in "continuous":
self.scan_type.put(ScanType.CONTINUOUS)
elif scan_type in "triggered":
self.scan_type.put(ScanType.TRIGGERED)
if scan_duration >= 0:
self.scan_duration.put(scan_duration)
if readout_range == 1:
self.readout_range.put(ReadoutRange.ONE_V)
elif readout_range == 2:
self.readout_range.put(ReadoutRange.TWO_V)
elif readout_range == 5:
self.readout_range.put(ReadoutRange.FIVE_V)
elif readout_range == 10:
self.readout_range.put(ReadoutRange.TEN_V)
if encoder_type in "1/16":
self.encoder_factor.put(EncoderFactors.X1_16)
elif encoder_type in "1/8":
self.encoder_factor.put(EncoderFactors.X1_8)
elif encoder_type in "1/4":
self.encoder_factor.put(EncoderFactors.X1_4)
elif encoder_type in "1/2":
self.encoder_factor.put(EncoderFactors.X1_2)
elif encoder_type in "1":
self.encoder_factor.put(EncoderFactors.X1)
elif encoder_type in "2":
self.encoder_factor.put(EncoderFactors.X2)
elif encoder_type in "4":
self.encoder_factor.put(EncoderFactors.X4)
if enable_compression is True:
self.enable_compression.put(NIDAQCompression.ON)
elif enable_compression is False:
self.enable_compression.put(NIDAQCompression.OFF)
########################################
# Beamline Specific Implementations #
########################################
def on_init(self) -> None:
"""
Called when the device is initialized.
No signals are connected at this point. If you like to
set default values on signals, please use on_connected instead.
"""
def on_connected(self) -> None:
"""
Called after the device is connected and its signals are connected.
Default values for signals should be set here.
"""
status = TransitionStatus(self.heartbeat, transitions=[0, 1], strict=False)
self.cancel_on_stop(status)
try:
status.wait(timeout=self.timeout_wait_for_signal) # Raises if timeout is reached
except WaitTimeoutError:
logger.warning(f"Device {self.name} was not alive, trying to put power on")
status = TransitionStatus(self.heartbeat, transitions=[0, 1], strict=False)
self.cancel_on_stop(status)
self.power.put(1)
status.wait(timeout=self.timeout_wait_for_signal)
status = CompareStatus(self.state, NidaqState.STANDBY)
self.cancel_on_stop(status)
status.wait(timeout=self.timeout_wait_for_signal)
self.scan_duration.set(0).wait(timeout=self._timeout_wait_for_pv)
self.time_left.subscribe(self._progress_update, run=False)
def on_stage(self) -> DeviceStatus | StatusBase | None:
"""
Called while staging the device.
Information about the upcoming scan can be accessed from the scan_info (self.scan_info.msg) object.
If the upcoming scan is not in the list of valid scans, return immediately.
"""
if not self._check_if_scan_name_is_valid():
return None
if self.state.get() != NidaqState.STANDBY:
status = CompareStatus(self.state, NidaqState.STANDBY)
self.cancel_on_stop(status)
self.on_stop()
status.wait(timeout=self.timeout_wait_for_signal)
# If scan is not part of the valid_scan_names,
if self.scan_info.msg.scan_name != "nidaq_continuous_scan":
self.scan_type.set(ScanType.TRIGGERED).wait(timeout=self._timeout_wait_for_pv)
self.scan_duration.set(0).wait(timeout=self._timeout_wait_for_pv)
self.enable_compression.set(1).wait(timeout=self._timeout_wait_for_pv)
else:
self.scan_type.set(ScanType.CONTINUOUS).wait(timeout=self._timeout_wait_for_pv)
self.scan_duration.set(self.scan_info.msg.scan_parameters["scan_duration"]).wait(
timeout=self._timeout_wait_for_pv
)
self.enable_compression.set(self.scan_info.msg.scan_parameters["compression"]).wait(
timeout=self._timeout_wait_for_pv
)
# Stage call to IOC
status = CompareStatus(self.state, NidaqState.STAGE)
self.cancel_on_stop(status)
# TODO 11.11.25/HS64
# Switched from set to put in the hope to get rid of the rare event where nidaq is stopped at the start of a scan
# Problems consistently persisting, testing changing back to set, unconvinced this is the actual cause 14.11.25/AHC
# self.stage_call.set(1).wait(timeout=self._timeout_wait_for_pv)
self.stage_call.put(1)
status.wait(timeout=self.timeout_wait_for_signal)
if self.scan_info.msg.scan_name != "nidaq_continuous_scan":
status = self.on_kickoff()
self.cancel_on_stop(status)
status.wait(timeout=self._timeout_wait_for_pv)
logger.info(f"Device {self.name} was staged: {NidaqState(self.state.get())}")
def on_kickoff(self) -> DeviceStatus | StatusBase:
"""Kickoff the Nidaq"""
status = self.kickoff_call.set(1)
self.cancel_on_stop(status)
return status
def on_unstage(self) -> DeviceStatus | StatusBase | None:
"""Called while unstaging the device. Check that the Nidaq goes into Standby"""
status = CompareStatus(self.state, NidaqState.STANDBY)
self.cancel_on_stop(status)
status.wait(timeout=self.timeout_wait_for_signal)
status = self.enable_compression.set(1)
self.cancel_on_stop(status)
status.wait(self._timeout_wait_for_pv)
logger.info(f"Device {self.name} was unstaged: {NidaqState(self.state.get())}")
def on_pre_scan(self) -> DeviceStatus | StatusBase | None:
"""
Called right before the scan starts on all devices automatically.
Here we ensure that the NIDAQ master task is running
before the motor starts its oscillation. This is needed for being properly homed.
The NIDAQ should go into Acquiring mode.
"""
if not self._check_if_scan_name_is_valid():
return None
if self.scan_info.msg.scan_name == "nidaq_continuous_scan":
logger.info(f"Device {self.name} ready to be kicked off for nidaq_continuous_scan")
return None
status = CompareStatus(self.state, NidaqState.KICKOFF)
self.cancel_on_stop(status)
status.wait(timeout=self._timeout_wait_for_pv)
logger.info(
f"Device {self.name} ready to take data after pre_scan: {NidaqState(self.state.get())}"
)
def on_trigger(self) -> DeviceStatus | StatusBase | None:
"""Called when the device is triggered."""
def on_complete(self) -> DeviceStatus | StatusBase | None:
"""
Called to inquire if a device has completed a scans.
For the NIDAQ we use this method to stop the backend since it
would not stop by itself in its current implementation since the number of points are not predefined.
"""
if not self._check_if_scan_name_is_valid():
return None
status = CompareStatus(self.state, NidaqState.STANDBY)
self.cancel_on_stop(status)
if self.scan_info.msg.scan_name != "nidaq_continuous_scan":
self.on_stop()
return status
def _progress_update(self, value, **kwargs) -> None:
"""Callback method to update the scan progress, runs a callback
to SUB_PROGRESS subscribers, i.e. BEC.
Args:
value (int) : current progress value
"""
scan_duration = self.scan_info.msg.scan_parameters.get("scan_duration", None)
if not isinstance(scan_duration, (int, float)):
return
value = scan_duration - value
max_value = scan_duration
self.progress_signal.put(value=value, max_value=max_value, done=bool(max_value == value))
def on_stop(self) -> None:
"""Called when the device is stopped."""
self.stop_call.put(1)
+60
View File
@@ -0,0 +1,60 @@
import enum
class NIDAQCompression(str, enum.Enum):
"""Options for Compression"""
OFF = 0
ON = 1
class ScanType(int, enum.Enum):
"""Triggering options of the backend"""
TRIGGERED = 0
CONTINUOUS = 1
class NidaqState(int, enum.Enum):
"""Possible States of the NIDAQ backend"""
DISABLED = 0
STANDBY = 1
STAGE = 2
KICKOFF = 3
ACQUIRE = 4
UNSTAGE = 5
class ScanRates(int, enum.Enum):
"""Sampling Rate options for the backend, in kHZ and MHz"""
HUNDRED_KHZ = 0
FIVE_HUNDRED_KHZ = 1
ONE_MHZ = 2
TWO_MHZ = 3
FOUR_MHZ = 4
FIVE_MHZ = 5
TEN_MHZ = 6
FOURTEEN_THREE_MHZ = 7
class ReadoutRange(int, enum.Enum):
"""ReadoutRange in +-V"""
ONE_V = 0
TWO_V = 1
FIVE_V = 2
TEN_V = 3
class EncoderFactors(int, enum.Enum):
"""Encoder Factors"""
X1_16 = 0
X1_8 = 1
X1_4 = 2
X1_2 = 3
X1 = 4
X2 = 5
X4 = 6
+1
View File
@@ -0,0 +1 @@
from .superxas_nexus_structure import SuperXASNexusStructure
@@ -0,0 +1,308 @@
from bec_server.file_writer.default_writer import DefaultFormat
import superxas_bec.bec_widgets.widgets.x10da_parameters as bl
class SuperXASNexusStructure(DefaultFormat):
"""Nexus Structure for SuperXAS"""
def format(self) -> None:
"""Specify the file format for the file writer."""
entry = self.storage.create_group(name="entry")
entry.attrs["NX_class"] = "NXentry"
instrument = entry.create_group(name="instrument")
instrument.attrs["NX_class"] = "NXinstrument"
##################
## source specific information
###################
source = instrument.create_group(name="source")
source.attrs["NX_class"] = "NXsource"
beamline_name = source.create_dataset(name="beamline_name", data="SuperXAS")
beamline_name.attrs["NX_class"] = "NX_CHAR"
facility_name = source.create_dataset(name="facility_name", data="Swiss Light Source")
facility_name.attrs["NX_class"] = "NX_CHAR"
probe = source.create_dataset(name="probe", data="X-ray")
probe.attrs["NX_class"] = "NX_CHAR"
if "curr" in self.device_manager.devices:
ring_current = source.create_soft_link(
name="ring_current",
target="/entry/collection/devices/curr/curr/value",
)
ring_current.attrs["NX_class"] = "NX_FLOAT"
ring_current.attrs["units"] = "mA"
###################
## mo1_bragg specific information
###################
## Logic if device exist
if "mo1_bragg" in self.device_manager.devices:
monochromator = instrument.create_group(name="monochromator")
monochromator.attrs["NX_class"] = "NXmonochromator"
crystal = monochromator.create_group(name="crystal")
crystal.attrs["NX_class"] = "NXcrystal"
# Create a dataset
chemical_formular = crystal.create_dataset(name="chemical_formular", data="Si")
chemical_formular.attrs["NX_class"] = "NX_CHAR"
reflection = crystal.create_soft_link(
name="reflection",
target="/entry/collection/devices/mo1_bragg/mo1_bragg_crystal_current_xtal_string/value",
)
reflection.attrs["NX_class"] = "NX_CHAR"
# Create a softlink
d_spacing = crystal.create_soft_link(
name="d_spacing",
target="/entry/collection/devices/mo1_bragg/mo1_bragg_crystal_current_d_spacing/value",
)
d_spacing.attrs["NX_class"] = "NX_FLOAT"
d_spacing.attrs["units"] = "angstrom"
bragg_offset = crystal.create_soft_link(
name="bragg_offset",
target="/entry/collection/devices/mo1_bragg/mo1_bragg_crystal_current_bragg_off/value",
)
bragg_offset.attrs["NX_class"] = "NX_FLOAT"
bragg_offset.attrs["units"] = "degree"
###################
### cm mirror specific information
####################
collimating_mirror = instrument.create_group(name="collimating_mirror")
collimating_mirror.attrs["NX_class"] = "NXmirror"
cm_substrate_material = collimating_mirror.create_dataset(
name="substrate_material", data="Si"
)
cm_substrate_material.attrs["NX_class"] = "NX_CHAR"
#previous error due to space in name field
if "cm_bnd" in self.device_manager.devices:
cm_bending = collimating_mirror.create_soft_link(
name="sagittal_radius_bender_motor",
target="/entry/collection/devices/cm_bnd/cm_bnd/value",
)
cm_bending.attrs["NX_class"] = "NX_FLOAT"
cm_bending.attrs["units"] = "steps"
if "cm_rotx" in self.device_manager.devices:
cm_incidence_angle = collimating_mirror.create_soft_link(
name="incidence_angle", target="/entry/collection/devices/cm_rotx/cm_rotx/value"
)
cm_incidence_angle.attrs["NX_class"] = "NX_FLOAT"
cm_incidence_angle.attrs["units"] = "mrad"
if "cm_roty" in self.device_manager.devices:
cm_yaw_angle = collimating_mirror.create_soft_link(
name="yaw_angle", target="/entry/collection/devices/cm_roty/cm_roty/value"
)
cm_yaw_angle.attrs["NX_class"] = "NX_FLOAT"
cm_yaw_angle.attrs["units"] = "mrad"
if "cm_rotz" in self.device_manager.devices:
cm_roll_angle = collimating_mirror.create_soft_link(
name="roll_angle", target="/entry/collection/devices/cm_rotz/cm_rotz/value"
)
cm_roll_angle.attrs["NX_class"] = "NX_FLOAT"
cm_roll_angle.attrs["units"] = "mrad"
if 'cm_trx' in self.device_manager.devices:
cm_trx = - self.device_manager.devices.cm_trx.read(cached=True).get('cm_trx').get('value')
stripe = 'Unknown'
for name, low, high in zip(bl.cm.surface, bl.cm.limOptX[0], bl.cm.limOptX[1]):
if low <= cm_trx <= high:
stripe = name
cm_stripe = collimating_mirror.create_dataset(
name="stripe", data=stripe
)
cm_stripe.attrs["NX_class"] = "NX_CHAR"
###################
### fm mirror specific information
####################
focusing_mirror = instrument.create_group(name="focusing_mirror")
focusing_mirror.attrs["NX_class"] = "NXmirror"
fm_substrate_material = focusing_mirror.create_dataset(
name="substrate_material", data="Si"
)
fm_substrate_material.attrs["NX_class"] = "NX_CHAR"
if "fm_bnd" in self.device_manager.devices:
fm_bending = focusing_mirror.create_soft_link(
name="sagittal_radius_bender_motor",
target="/entry/collection/devices/fm_bnd/fm_bnd/value",
)
fm_bending.attrs["NX_class"] = "NX_FLOAT"
fm_bending.attrs["units"] = "steps"
if "fm_rotx" in self.device_manager.devices:
fm_incidence_angle = focusing_mirror.create_soft_link(
name="incidence_angle", target="/entry/collection/devices/fm_rotx/fm_rotx/value"
)
fm_incidence_angle.attrs["NX_class"] = "NX_FLOAT"
fm_incidence_angle.attrs["units"] = "mrad"
if "fm_roty" in self.device_manager.devices:
fm_yaw_angle = focusing_mirror.create_soft_link(
name="yaw_angle", target="/entry/collection/devices/fm_roty/fm_roty/value"
)
fm_yaw_angle.attrs["NX_class"] = "NX_FLOAT"
fm_yaw_angle.attrs["units"] = "mrad"
if "fm_rotz" in self.device_manager.devices:
fm_roll_angle = focusing_mirror.create_soft_link(
name="roll_angle", target="/entry/collection/devices/fm_rotz/fm_rotz/value"
)
fm_roll_angle.attrs["NX_class"] = "NX_FLOAT"
fm_roll_angle.attrs["units"] = "mrad"
if 'fm_trx' in self.device_manager.devices:
fm_trx = - self.device_manager.devices.fm_trx.read(cached=True).get('fm_trx').get('value')
stripe = 'Unknown'
for name, low, high in zip(bl.fm.surfaceToroid, bl.fm.limOptXToroid[1], bl.fm.limOptXToroid[0]):
if low <= fm_trx <= high:
stripe = name + ' (toroid)'
fm_stripe = focusing_mirror.create_dataset(
name="stripe", data=stripe
)
fm_stripe.attrs["NX_class"] = "NX_CHAR"
###################
## nidaq specific information
###################
## Logic if device exist
if "nidaq" in self.device_manager.devices:
#ai_chans_bits = self.device_manager.devices.nidaq.ai_chans.read(cached=True).get("nidaq_ai_chans").get("value")
ai_chans_bits = self.configuration.get("nidaq", {}).get("nidaq_ai_chans", {}).get("value")
ci_chans_bits = self.configuration.get("nidaq", {}).get("nidaq_ci_chans", {}).get("value")
#add_chans_bits = self.device_manager.devices.nidaq.add_chans.read(cached=True).get("nidaq_add_chans").get("value")
add_chans_bits = self.configuration.get("nidaq", {}).get("nidaq_add_chans", {}).get("value")
measurement_mode = entry.create_group(name="mode")
measurement_mode.attrs["NX_class"] = "NX_CHAR"
if (int(ci_chans_bits) & 0x7F) != 0:
# Create a dataset
rayspec_sdd_active = measurement_mode.create_group(name="Multi_Element_Partial_Fluorescence_Yield")
me_sdd = rayspec_sdd_active.create_dataset(name="Detector", data="Rayspec 7 element Silicon Drift Detector")
me_sdd.attrs["NX_class"] = "NX_CHAR"
if (int(ci_chans_bits) & (1<<8)) != 0:
# Create a dataset
ketek_sdd_active = measurement_mode.create_group(name="Single_Element_Partial_Fluorescence_Yield")
se_sdd = ketek_sdd_active.create_dataset(name="Detector", data="Ketex mini single element Silicon Drift Detector")
se_sdd.attrs["NX_class"] = "NX_CHAR"
if ((int(ai_chans_bits) & (1<<6)) != 0):
# Create a dataset
pips_active = measurement_mode.create_group(name="Total_Flourescence_Yield")
tfy = pips_active.create_dataset(name="Detector", data="Mirion Technologies Partially Depeleted PIPS Detector")
tfy.attrs["NX_class"] = "NX_CHAR"
if ((int(ai_chans_bits) & (1<<0)) != 0) & ((int(ai_chans_bits) & (1<<2)) != 0):
# Create a dataset
ai0ai2_active = measurement_mode.create_group(name="Sample_Transmission")
sam_trans = ai0ai2_active.create_dataset(name="Detector", data="Ionitec 15 cm gas filled Ionisation Chambers")
sam_trans.attrs["NX_class"] = "NX_CHAR"
if ((int(ai_chans_bits) & (1<<2)) != 0) & ((int(ai_chans_bits) & (1<<4)) != 0):
# Create a dataset
ai2ai4_active = measurement_mode.create_group(name="Reference_Transmission")
ref_trans = ai2ai4_active.create_dataset(name="Detector", data="Ionitec 15 cm gas filled Ionisation Chambers")
ref_trans.attrs["NX_class"] = "NX_CHAR"
main_data = entry.create_group(name="data")
main_data.attrs["NX_class"] = "NXdata"
##################
## energy, test whether the signal exists. how to check from config?
###################
energy = main_data.create_group(name="energy")
energy.attrs["NX_class"] = "NXdata"
energy.attrs["units"] = "eV"
main_data.create_soft_link(name="energy", target="/entry/collection/readout_groups/async/nidaq/nidaq_energy/value")
##################
## i0
###################
if (int(ai_chans_bits) & (1<<0)) !=0:
i0 = main_data.create_group(name="i0")
i0.attrs["NX_class"] = "NXdata"
i0.attrs["units"] = "V"
main_data.create_soft_link(name="i0", target="/entry/collection/readout_groups/async/nidaq/nidaq_ai0_mean/value")
##################
## i1
###################
if (int(ai_chans_bits) & (1<<2)) !=0:
i1 = main_data.create_group(name="i1")
i1.attrs["NX_class"] = "NXdata"
i1.attrs["units"] = "V"
main_data.create_soft_link(name="i1", target="/entry/collection/readout_groups/async/nidaq/nidaq_ai2_mean/value")
##################
## i2
###################
if (int(ai_chans_bits) & (1<<4)) !=0:
i2 = main_data.create_group(name="i2")
i2.attrs["NX_class"] = "NXdata"
i2.attrs["units"] = "V"
main_data.create_soft_link(name="i2", target="/entry/collection/readout_groups/async/nidaq/nidaq_ai4_mean/value")
##################
## ci sum
###################
if int(ci_chans_bits) > 0:
ci_sum = main_data.create_group(name="Fluorescence_Sum")
ci_sum.attrs["NX_class"] = "NXdata"
ci_sum.attrs["units"] = "counts"
main_data.create_soft_link(name="Fluorescence_Sum", target="/entry/collection/readout_groups/async/nidaq/nidaq_cisum/value")
##################
## mu sample, test whether the signal exists. how to check from config?
###################
if (int(add_chans_bits) & (1<<0)) !=0:
mu_sample = main_data.create_group(name="mu_sample")
mu_sample.attrs["NX_class"] = "NXdata"
main_data.create_soft_link(name="mu_sample", target="/entry/collection/readout_groups/async/nidaq/nidaq_smpl_abs/value")
##################
## mu reference, test whether the signal exists. how to check from config?
###################
if (int(add_chans_bits) & (1<<1)) !=0:
mu_reference = main_data.create_group(name="mu_reference")
mu_reference.attrs["NX_class"] = "NXdata"
main_data.create_soft_link(name="mu_reference", target="/entry/collection/readout_groups/async/nidaq/nidaq_ref_abs/value")
+9 -1
View File
@@ -1 +1,9 @@
from .exafs_scan import EXAFSScan
from .exafs_scan import EXAFSScan
from .mono_bragg_scans import (
XASAdvancedScan,
# XASAdvancedScanWithXRD,
XASSimpleScan,
# XASSimpleScanWithXRD,
)
from .nidaq_cont_scan import NIDAQContinuousScan
@@ -1,6 +1,6 @@
# from .metadata_schema_template import ExampleSchema
# from .metadata_schema_xas_simple_scan import xas_simple_scan_schema
METADATA_SCHEMA_REGISTRY = {
METADATA_SCHEMA_REGISTRY = { # "xas_simple_scan": xas_simple_scan_schema
# Add models which should be used to validate scan metadata here.
# Make a model according to the template, and import it as above
# Then associate it with a scan like so:
@@ -9,4 +9,4 @@ METADATA_SCHEMA_REGISTRY = {
# Define a default schema type which should be used as the fallback for everything:
DEFAULT_SCHEMA = None
DEFAULT_SCHEMA = None
@@ -0,0 +1,8 @@
from bec_lib.metadata_schema import BasicScanMetadata
#
#
class xas_simple_scan_schema(BasicScanMetadata):
Edge: str
Element: str
+338
View File
@@ -0,0 +1,338 @@
"""This module contains the scan classes for the mono bragg motor of the SuperXAS beamline."""
import time
from typing import Literal
import numpy as np
from bec_lib.device import DeviceBase
from bec_lib.logger import bec_logger
from bec_server.scan_server.scans import AsyncFlyScanBase
logger = bec_logger.logger
class XASSimpleScan(AsyncFlyScanBase):
"""Class for the XAS simple scan"""
scan_name = "xas_simple_scan"
scan_type = "fly"
scan_report_hint = "device_progress"
required_kwargs = []
use_scan_progress_report = False
pre_move = False
gui_config = {
"Movement Parameters": ["start", "stop"],
"Scan Parameters": ["scan_time", "scan_duration"],
}
def __init__(
self,
start: float,
stop: float,
scan_time: float,
scan_duration: float,
motor: DeviceBase = "mo1_bragg",
**kwargs,
):
"""The xas_simple_scan is used to start a simple oscillating scan on the mono bragg motor.
Start and Stop define the energy range for the scan, scan_time is the time for one scan
cycle and scan_duration is the duration of the scan. If scan duration is set to 0, the
scan will run infinitely.
Args:
start (float): Start energy for the scan.
stop (float): Stop energy for the scan.
scan_time (float): Time for one scan cycle.
scan_duration (float): Duration of the scan.
motor (DeviceBase, optional): Motor device to be used for the scan.
Defaults to "mo1_bragg".
Examples:
>>> scans.xas_simple_scan(start=8000, stop=9000, scan_time=1, scan_duration=10)
"""
super().__init__(**kwargs)
self.motor = motor
self.start = start
self.stop = stop
self.scan_time = scan_time
self.scan_duration = scan_duration
self.primary_readout_cycle = 1
def stage(self):
"""call the stage procedure"""
# Compute position for mo1_gonio pre move
# Since energy is not linear to angle, we have to calculate the angles first.
pos_start = yield from self.stubs.send_rpc_and_wait(
"mo1_bragg",
"convert_angle_energy",
mode = "EnergyToAngle",
inp = self.start,
)
pos_end = yield from self.stubs.send_rpc_and_wait(
"mo1_bragg",
"convert_angle_energy",
mode = "EnergyToAngle",
inp = self.stop,
)
# Goniometer position is in the middle of the start and stop angle of the scan
pos = (pos_start + pos_end) / 2
# Premove with mo1_gonio
yield from self.stubs.send_rpc_and_wait(
"mo1_gonio",
"move",
position = pos,
wait = True,
timeout = 30, # 30 seconds timeout
)
# Continue with staging the devices
yield from self.stubs.stage()
def update_readout_priority(self):
"""Ensure that NIDAQ is not monitored for any quick EXAFS."""
super().update_readout_priority()
self.readout_priority["async"].append("nidaq")
def prepare_positions(self):
"""Prepare the positions for the scan.
Use here only start and end energy defining the range for the scan.
"""
self.positions = np.array([self.start, self.stop], dtype=float)
self.num_pos = None
yield None
def pre_scan(self):
"""Pre Scan action."""
self._check_limits()
# Ensure parent class pre_scan actions to be called.
yield from super().pre_scan()
def scan_report_instructions(self):
"""
Return the instructions for the scan report.
"""
yield from self.stubs.scan_report_instruction({"device_progress": [self.motor]})
def scan_core(self):
"""Run the scan core.
Kickoff the oscillation on the Bragg motor and wait for the completion of the motion.
"""
# Start the oscillation on the Bragg motor.
yield from self.stubs.kickoff(device=self.motor)
complete_status = yield from self.stubs.complete(device=self.motor, wait=False)
while not complete_status.done:
# Readout monitored devices
yield from self.stubs.read(group="monitored", point_id=self.point_id)
time.sleep(self.primary_readout_cycle)
self.point_id += 1
self.num_pos = self.point_id
# class XASSimpleScanWithXRD(XASSimpleScan):
# """Class for the XAS simple scan with XRD"""
# scan_name = "xas_simple_scan_with_xrd"
# gui_config = {
# "Movement Parameters": ["start", "stop"],
# "Scan Parameters": ["scan_time", "scan_duration"],
# "Low Energy Range": ["xrd_enable_low", "num_trigger_low", "exp_time_low", "cycle_low"],
# "High Energy Range": ["xrd_enable_high", "num_trigger_high", "exp_time_high", "cycle_high"],
# }
# def __init__(
# self,
# start: float,
# stop: float,
# scan_time: float,
# scan_duration: float,
# xrd_enable_low: bool,
# num_trigger_low: int,
# exp_time_low: float,
# cycle_low: int,
# xrd_enable_high: bool,
# num_trigger_high: int,
# exp_time_high: float,
# cycle_high: float,
# motor: DeviceBase = "mo1_bragg",
# **kwargs,
# ):
# """The xas_simple_scan_with_xrd is an oscillation motion on the mono motor
# with XRD triggering at low and high energy ranges.
# If scan duration is set to 0, the scan will run infinitely.
# Args:
# start (float): Start energy for the scan.
# stop (float): Stop energy for the scan.
# scan_time (float): Time for one oscillation .
# scan_duration (float): Total duration of the scan.
# xrd_enable_low (bool): Enable XRD triggering for the low energy range.
# num_trigger_low (int): Number of triggers for the low energy range.
# exp_time_low (float): Exposure time for the low energy range.
# cycle_low (int): Specify how often the triggers should be considered,
# every nth cycle for low
# xrd_enable_high (bool): Enable XRD triggering for the high energy range.
# num_trigger_high (int): Number of triggers for the high energy range.
# exp_time_high (float): Exposure time for the high energy range.
# cycle_high (int): Specify how often the triggers should be considered,
# every nth cycle for high
# motor (DeviceBase, optional): Motor device to be used for the scan.
# Defaults to "mo1_bragg".
# Examples:
# >>> scans.xas_simple_scan_with_xrd(start=8000, stop=9000, scan_time=1, scan_duration=10, xrd_enable_low=True, num_trigger_low=5, cycle_low=2, exp_time_low=100, xrd_enable_high=False, num_trigger_high=3, cycle_high=1, exp_time_high=1000)
# """
# super().__init__(
# start=start,
# stop=stop,
# scan_time=scan_time,
# scan_duration=scan_duration,
# motor=motor,
# **kwargs,
# )
# self.xrd_enable_low = xrd_enable_low
# self.num_trigger_low = num_trigger_low
# self.exp_time_low = exp_time_low
# self.cycle_low = cycle_low
# self.xrd_enable_high = xrd_enable_high
# self.num_trigger_high = num_trigger_high
# self.exp_time_high = exp_time_high
# self.cycle_high = cycle_high
class XASAdvancedScan(XASSimpleScan):
"""Class for the XAS advanced scan"""
scan_name = "xas_advanced_scan"
gui_config = {
"Movement Parameters": ["start", "stop"],
"Scan Parameters": ["scan_time", "scan_duration"],
"Spline Parameters": ["p_kink", "e_kink"],
}
def __init__(
self,
start: float,
stop: float,
scan_time: float,
scan_duration: float,
p_kink: float,
e_kink: float,
motor: DeviceBase = "mo1_bragg",
**kwargs,
):
"""The xas_advanced_scan is an oscillation motion on the mono motor.
Start and Stop define the energy range for the scan, scan_time is the
time for one scan cycle and scan_duration is the duration of the scan.
If scan duration is set to 0, the scan will run infinitely.
p_kink and e_kink add a kink to the motion profile to slow down in the
exafs region of the scan.
Args:
start (float): Start angle for the scan.
stop (float): Stop angle for the scan.
scan_time (float): Time for one oscillation .
scan_duration (float): Total duration of the scan.
p_kink (float): Position of the kink.
e_kink (float): Energy of the kink.
motor (DeviceBase, optional): Motor device to be used for the scan.
Defaults to "mo1_bragg".
Examples:
>>> scans.xas_advanced_scan(start=10000, stop=12000, scan_time=0.5, scan_duration=10, p_kink=50, e_kink=10500)
"""
super().__init__(
start=start,
stop=stop,
scan_time=scan_time,
scan_duration=scan_duration,
motor=motor,
**kwargs,
)
self.p_kink = p_kink
self.e_kink = e_kink
# class XASAdvancedScanWithXRD(XASAdvancedScan):
# """Class for the XAS advanced scan with XRD"""
# scan_name = "xas_advanced_scan_with_xrd"
# gui_config = {
# "Movement Parameters": ["start", "stop"],
# "Scan Parameters": ["scan_time", "scan_duration"],
# "Spline Parameters": ["p_kink", "e_kink"],
# "Low Energy Range": ["xrd_enable_low", "num_trigger_low", "exp_time_low", "cycle_low"],
# "High Energy Range": ["xrd_enable_high", "num_trigger_high", "exp_time_high", "cycle_high"],
# }
# def __init__(
# self,
# start: float,
# stop: float,
# scan_time: float,
# scan_duration: float,
# p_kink: float,
# e_kink: float,
# xrd_enable_low: bool,
# num_trigger_low: int,
# exp_time_low: float,
# cycle_low: int,
# xrd_enable_high: bool,
# num_trigger_high: int,
# exp_time_high: float,
# cycle_high: float,
# motor: DeviceBase = "mo1_bragg",
# **kwargs,
# ):
# """The xas_advanced_scan is an oscillation motion on the mono motor
# with XRD triggering at low and high energy ranges.
# Start and Stop define the energy range for the scan, scan_time is the time for
# one scan cycle and scan_duration is the duration of the scan. If scan duration
# is set to 0, the scan will run infinitely. p_kink and e_kink add a kink to the
# motion profile to slow down in the exafs region of the scan.
# Args:
# start (float): Start angle for the scan.
# stop (float): Stop angle for the scan.
# scan_time (float): Time for one oscillation .
# scan_duration (float): Total duration of the scan.
# p_kink (float): Position of kink.
# e_kink (float): Energy of the kink.
# xrd_enable_low (bool): Enable XRD triggering for the low energy range.
# num_trigger_low (int): Number of triggers for the low energy range.
# exp_time_low (float): Exposure time for the low energy range.
# cycle_low (int): Specify how often the triggers should be considered,
# every nth cycle for low
# xrd_enable_high (bool): Enable XRD triggering for the high energy range.
# num_trigger_high (int): Number of triggers for the high energy range.
# exp_time_high (float): Exposure time for the high energy range.
# cycle_high (int): Specify how often the triggers should be considered,
# every nth cycle for high
# motor (DeviceBase, optional): Motor device to be used for the scan.
# Defaults to "mo1_bragg".
# Examples:
# >>> scans.xas_advanced_scan_with_xrd(start=10000, stop=12000, scan_time=0.5, scan_duration=10, p_kink=50, e_kink=10500, xrd_enable_low=True, num_trigger_low=5, cycle_low=2, exp_time_low=100, xrd_enable_high=False, num_trigger_high=3, cycle_high=1, exp_time_high=1000)
# """
# super().__init__(
# start=start,
# stop=stop,
# scan_time=scan_time,
# scan_duration=scan_duration,
# p_kink=p_kink,
# e_kink=e_kink,
# motor=motor,
# **kwargs,
# )
# self.p_kink = p_kink
# self.e_kink = e_kink
# self.xrd_enable_low = xrd_enable_low
# self.num_trigger_low = num_trigger_low
# self.exp_time_low = exp_time_low
# self.cycle_low = cycle_low
# self.xrd_enable_high = xrd_enable_high
# self.num_trigger_high = num_trigger_high
# self.exp_time_high = exp_time_high
# self.cycle_high = cycle_high
+84
View File
@@ -0,0 +1,84 @@
"""This module contains the scan class for the nidaq of the SuperXAS beamline for use in continuous mode."""
import time
from typing import Literal
import numpy as np
from bec_lib.device import DeviceBase
from bec_lib.logger import bec_logger
from bec_server.scan_server.scans import AsyncFlyScanBase
logger = bec_logger.logger
class NIDAQContinuousScan(AsyncFlyScanBase):
"""Class for the nidaq continuous scan (without mono)"""
scan_name = "nidaq_continuous_scan"
scan_type = "fly"
scan_report_hint = "device_progress"
required_kwargs = []
use_scan_progress_report = False
pre_move = False
gui_config = {"Scan Parameters": ["scan_duration"], "Data Compression": ["compression"]}
def __init__(
self, scan_duration: float, daq: DeviceBase = "nidaq", compression: bool = False, **kwargs
):
"""The NIDAQ continuous scan is used to measure with the NIDAQ without moving the
monochromator or any other motor. The NIDAQ thus runs in continuous mode, with a
set scan_duration.
Args:
scan_duration (float): Duration of the scan.
daq (DeviceBase, optional): DAQ device to be used for the scan.
Defaults to "nidaq".
Examples:
>>> scans.nidaq_continuous_scan(scan_duration=10)
"""
super().__init__(**kwargs)
self.scan_duration = scan_duration
self.daq = daq
self.start_time = 0
self.primary_readout_cycle = 1
self.scan_parameters["scan_duration"] = scan_duration
self.scan_parameters["compression"] = compression
def update_readout_priority(self):
"""Ensure that NIDAQ is not monitored for any quick EXAFS."""
super().update_readout_priority()
self.readout_priority["async"].append("nidaq")
def prepare_positions(self):
"""Prepare the positions for the scan."""
yield None
def pre_scan(self):
"""Pre Scan action."""
self.start_time = time.time()
# Ensure parent class pre_scan actions to be called.
yield from super().pre_scan()
def scan_report_instructions(self):
"""
Return the instructions for the scan report.
"""
yield from self.stubs.scan_report_instruction({"device_progress": [self.daq]})
def scan_core(self):
"""Run the scan core.
Kickoff the acquisition of the NIDAQ wait for the completion of the scan.
"""
kickoff_status = yield from self.stubs.kickoff(device=self.daq)
kickoff_status.wait(timeout=5) # wait for proper kickoff of device
complete_status = yield from self.stubs.complete(device=self.daq, wait=False)
while not complete_status.done:
# Readout monitored devices
yield from self.stubs.read(group="monitored", point_id=self.point_id)
time.sleep(self.primary_readout_cycle)
self.point_id += 1
self.num_pos = self.point_id
@@ -0,0 +1,216 @@
from __future__ import annotations
import queue
import threading
import time
import traceback
import h5py
import numpy as np
from bec_lib import bec_logger, messages
from bec_lib.bec_service import BECService
from bec_lib.endpoints import MessageEndpoints
from bec_lib.file_utils import FileWriter
from bec_lib.redis_connector import MessageObject, RedisConnector
from bec_lib.service_config import ServiceConfig
logger = bec_logger.logger
class NIDAQWriterService(BECService):
"""
A service that receives data from the NIDAQ through Redis and writes it continuously to a file.
"""
reshape_dataset = True
use_redis_stream = True
def __init__(self, config: ServiceConfig, connector_cls: RedisConnector) -> None:
super().__init__(config=config, connector_cls=connector_cls, unique_service=True)
self.queue = queue.Queue()
config = self._service_config.config.get("file_writer")
self.writer_mixin = FileWriter(service_config=config)
self._scan_status_consumer = None
self._ni_data_consumer = None
self._ni_data_event = None
self._ni_writer = None
self._ni_writer_event = None
self.scan_number = None
self.scan_is_running = False
self.filename = ""
self.elapsed_time = 0
self.start_time = 0
self._start_scan_status_consumer()
self._start_ni_data_consumer()
self._start_ni_writer()
def _start_scan_status_consumer(self) -> None:
"""
Start the scan consumer.
"""
self._scan_status_consumer = self.connector.consumer(
MessageEndpoints.scan_status(), cb=self._scan_status_callback, parent=self
)
self._scan_status_consumer.start()
@staticmethod
def _scan_status_callback(message: MessageObject, parent: NIDAQWriterService) -> None:
"""
Callback for scan status messages.
"""
msg = message.value
if not msg:
return
parent.handle_scan_status(msg)
def _start_ni_data_consumer(self) -> None:
"""
Start the NI data consumer.
"""
self._ni_data_event = threading.Event()
self._ni_data_consumer = threading.Thread(target=self._run_read_loop, daemon=True)
self._ni_data_consumer.start()
def _start_ni_writer(self) -> None:
"""
Start the NI data writer.
"""
self._ni_writer_event = threading.Event()
self._ni_writer = threading.Thread(target=self._write_data, daemon=True)
self._ni_writer.start()
def _run_read_loop(self) -> None:
"""
Run the read loop.
"""
while not self._ni_data_event.is_set():
self._read_data()
def _read_data(self):
"""
Read data from Redis.
"""
if not self.scan_is_running:
time.sleep(0.01)
return
self.filename = self.writer_mixin.compile_full_filename(self.scan_number, "ni.h5")
start_time = time.time()
if self.use_redis_stream:
msg = self.connector.xread("ni_data")
if msg:
try:
num_msgs = len(msg[0][1])
logger.debug(
f"Received {num_msgs} messages in {time.time() - start_time} seconds"
)
msgs = [messages.DeviceMessage.loads(m[1][b"device_msg"]) for m in msg[0][1]]
start_time = time.time()
self.handle_ni_data(msgs)
logger.debug(
f"Handled {num_msgs} messages in {time.time() - start_time} seconds"
)
except Exception as exc:
content = traceback.format_exc()
logger.error(f"Failed to parse message: {content}")
time.sleep(0.01)
else:
msgs = self.connector._redis_conn.lpop("ni_data:val", 20)
time.sleep(0.001)
if msgs:
try:
msgs = [messages.DeviceMessage.loads(msg) for msg in msgs]
print(f"Received {len(msgs)} messages in {time.time() - start_time} seconds")
start_time = time.time()
self.handle_ni_data(msgs)
print(f"Handled {len(msgs)} messages in {time.time() - start_time} seconds")
except Exception as exc:
content = traceback.format_exc()
logger.error(f"Failed to parse message: {content}")
def handle_scan_status(self, msg: messages.ScanStatusMessage) -> None:
"""
Handle scan status messages.
Args:
msg: The scan status message.
"""
status = msg.content["status"]
if status == "open":
self.scan_number = msg.content["info"].get("scan_number")
if self.scan_number is not None:
self.scan_is_running = True
else:
self.scan_is_running = False
def handle_ni_data(self, msgs: list[messages.DeviceMessage]) -> None:
"""
Receive NI data messages and write them to the writer queue.
Args:
msgs: The NI data messages.
"""
logger.info(f"Received {len(msgs)} NI data messages")
# concatenate all messages
signals = {}
for key in msgs[0].content["signals"]:
signals[key] = np.concatenate([msg.content["signals"][key]["value"] for msg in msgs])
# write data to queue
self.queue.put(signals)
def _write_data(self) -> None:
"""
Get data from the writer queue and write it to disk.
"""
while not self._ni_writer_event.is_set():
signals = self.queue.get()
logger.info(f"Remaining tasks: {self.queue.qsize()}")
self.write_data(signals)
self.queue.task_done()
def write_data(self, signals: dict) -> None:
"""
Write data to disk.
Args:
signals: The signals to write to disk.
"""
# create a new file if it doesn't exist, otherwise append to it
logger.info("Writing NI data to HDF5 file")
start_time = time.time()
if not self.filename:
return
with h5py.File(self.filename, "a") as file:
if self.reshape_dataset:
for key in signals:
# if the dataset already exists, append to it
if key in file:
dataset = file[key]
dataset.resize(dataset.shape[0] + len(signals[key]), axis=0)
dataset[-len(signals[key]) :] = signals[key]
# otherwise create a new dataset
else:
file.create_dataset(key, data=signals[key], chunks=True, maxshape=(None,))
else:
# get all group names
group_names = list(file.keys())
# get max dataset number
dataset_num = [int(name.split("_")[1]) for name in group_names if "dataset" in name]
if dataset_num:
dataset_num = max(dataset_num) + 1
else:
dataset_num = 0
group = file.create_group(f"dataset_{dataset_num}")
for key in signals:
group.create_dataset(key, data=signals[key], chunks=True, maxshape=(None,))
logger.info(f"Finished writing NI data in {time.time() - start_time} seconds")
@@ -0,0 +1 @@
from .NIDAQ_writer import NIDAQWriterService
@@ -0,0 +1,34 @@
from bec_lib import messages
from bec_lib.endpoints import MessageEndpoints
from bec_lib.redis_connector import RedisConnector
def send_scan_status(scan_number, status):
if status == "start":
scan_status_msg = messages.ScanStatusMessage(
scanID="test", status="open", info={"scan_number": scan_number}
)
elif status == "stop":
scan_status_msg = messages.ScanStatusMessage(
scanID="test", status="closed", info={"scan_number": scan_number}
)
else:
raise ValueError("Unknown status")
producer = RedisConnector(["localhost:6379"]).producer()
producer.send(MessageEndpoints.scan_status(), scan_status_msg.dumps())
print(f"Sent scan status message {scan_status_msg}")
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Scan status helper")
command = parser.add_subparsers(dest="command")
start = command.add_parser("start", help="Start a new scan")
start.add_argument("--scan_number", type=int, required=True, help="Scan number")
stop = command.add_parser("stop", help="Stop the scan")
stop.add_argument("--scan_number", type=int, required=True, help="Scan number")
args = parser.parse_args()
send_scan_status(args.scan_number, args.command)
@@ -0,0 +1,44 @@
import threading
import time
import numpy as np
from bec_lib import messages
from bec_lib.redis_connector import RedisConnector
class NIDAQSim(threading.Thread):
use_redis_stream = True
def run(self):
print("NIDAQSim running")
index = 0
producer = RedisConnector(["localhost:6379"]).producer()
signal = np.asarray(range(index, index + 600000))
signals = {"signal1": signal, "signal2": signal}
msg = messages.DeviceMessage(signals=signals)
msg = msg.dumps()
messages.DeviceMessage.loads(msg)
total_time = time.time()
while True:
# if index > 1000:
# break
start = time.time()
# signals = {
# "signal1": np.asarray(range(index, index + 300000)),
# "signal2": np.asarray(range(index, index + 300000)),
# }
index += 1
if self.use_redis_stream:
producer.xadd("ni_data", {"device_msg": msg}, max_size=100)
else:
producer.lpush("ni_data", msg, max_size=10)
time.sleep(0.5)
print(f"Elapsed time: {time.time() - start}")
if __name__ == "__main__":
NIDAQSim().start()
+29
View File
@@ -0,0 +1,29 @@
import argparse
import threading
from bec_lib import bec_logger
from bec_lib.redis_connector import RedisConnector
from bec_lib.service_config import ServiceConfig
from superxas_bec.services.NIDAQ_writer import NIDAQWriterService
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("--config", default="", help="path to the config file")
clargs = parser.parse_args()
config_path = clargs.config
config = ServiceConfig(config_path)
bec_logger.level = bec_logger.LOGLEVEL.INFO
logger = bec_logger.logger
bec_server = NIDAQWriterService(config=config, connector_cls=RedisConnector)
try:
event = threading.Event()
# pylint: disable=E1102
logger.success("Started NIDAQ writer service")
event.wait()
except KeyboardInterrupt as e:
# bec_server.connector.raise_error("KeyboardInterrupt")
bec_server.shutdown()
event.set()
raise e