16 Commits

Author SHA1 Message Date
x10da e3865fd3be wip add adjustable images are written mode.
CI for superxas_bec / test (push) Failing after 1m17s
2026-05-13 10:00:36 +02:00
appel_c 5b14a10063 wip fixes
CI for superxas_bec / test (push) Failing after 32s
2026-05-12 12:38:26 +02:00
appel_c 19182daa47 wip bugfixes 2026-05-12 11:08:42 +02:00
x10da 67a26f231d fix timepix integration at superxas
CI for superxas_bec / test (push) Successful in 34s
2026-05-11 17:35:25 +02:00
x10da d17f3deefa Introduced nexus data structure (same as Debye)
CI for superxas_bec / test (pull_request) Successful in 34s
CI for superxas_bec / test (push) Successful in 34s
2026-05-07 14:10:40 +02:00
x10da f70ac8743d Added frontend absorber 2026-05-07 14:10:04 +02:00
x10da 2981c436db Updated config files 2026-05-07 14:10:04 +02:00
x10da 09a0bc6372 Adding signals of additional nidaq signals 2026-05-07 14:10:04 +02:00
x10da fff1e21481 Renaming of offset signals 2026-05-07 14:10:04 +02:00
x10da 2bfa7b6ca3 Change of order of nidaq signals 2026-05-07 14:10:04 +02:00
x10da 3f79b300ed Implemented logic to move goniometer to correct position. Disabled XAS with XRD scans 2026-05-07 14:10:04 +02:00
x10da 36cffb72a5 Disabled XAS with XRD scans 2026-05-07 14:10:04 +02:00
x10da 55bc4585e2 Added new signals 2026-05-07 14:10:04 +02:00
x10da 4f1386f5e1 Bugfix Transistionstatus in kickoff 2026-05-07 14:10:04 +02:00
x10da d687d74a74 Added mono goniometer motor 2026-05-07 14:10:04 +02:00
hitz_s aa6270fb55 introduction of mo1_bragg and nidaq 2026-05-07 14:10:04 +02:00
34 changed files with 4578 additions and 49 deletions
+4
View File
@@ -12,7 +12,11 @@ classifiers = [
"Programming Language :: Python :: 3",
"Topic :: Scientific/Engineering",
]
dependencies = [
"numpy",
"scipy",
"xrt",
"websockets",
]
@@ -0,0 +1,239 @@
"""
X10DA / SuperXAS Beamline Parameters.
This file describes the parameter of each component of the SuperXAS beamline
to be used for raytracing and geometrical calculations.
"""
import os
import numpy as np
from collections import namedtuple
import xrt.backends.raycing.materials as rm
# if os.environ.get("USE_XRT", "True").lower() in ("1", "true", "yes"):
# import xrt.backends.raycing.materials as rm # type: ignore
# else:
# class _DummyClass:
# def __init__(self, *args, **kwargs):
# pass
# class _DummyMaterials:
# Material = _DummyClass
# CrystalSi = _DummyClass
# rm = _DummyMaterials()
# XRT definitions
filterBeryl = rm.Material('Be', rho=1.85, kind='plate')
filterDiamond = rm.Material('C', rho=3.52, kind='plate')
filterGraphite = rm.Material('C', rho=2.266, kind='plate')
stripeSi = rm.Material('Si', rho=2.33)
stripePt = rm.Material('Pt', rho=21.45)
stripeRh = rm.Material('Rh', rho=12.41)
stripeCr = rm.Material('Cr', rho=7.14)
stripePyrex = rm.Material('Si', rho=2.20) # Use Si as bare element and the density of SiO2
si111_1 = rm.CrystalSi(hkl=(1, 1, 1), tK=77) # first xtal surface
si311_1 = rm.CrystalSi(hkl=(3, 1, 1), tK=77) # first xtal surface
si333_1 = rm.CrystalSi(hkl=(3, 3, 3), tK=77) # first xtal surface
si511_1 = rm.CrystalSi(hkl=(5, 1, 1), tK=77) # first xtal surface
si111_2 = rm.CrystalSi(hkl=(1, 1, 1), tK=77) # second xtal surface
si311_2 = rm.CrystalSi(hkl=(3, 1, 1), tK=77) # second xtal surface
si333_2 = rm.CrystalSi(hkl=(3, 3, 3), tK=77) # second xtal surface
si511_2 = rm.CrystalSi(hkl=(5, 1, 1), tK=77) # second xtal surface
filterDiamond = rm.Material('C', rho=3.52, kind='plate')
filterBe = rm.Material('Be', rho=1.85, kind='plate')
filterSi3N4 = rm.Material(['Si', 'N'], quantities=[3, 4], rho=3.44, kind='plate')
filterAl = rm.Material('Al', rho=2.69, kind='plate')
filterGraphite = rm.Material('C', rho=2.266, kind='plate')
sourceHeight = 0
#Synchrotron
synchrotron = namedtuple('synchrotron', ['eE', 'eI', 'eEspread',
'eEpsilonX', 'eEpsilonZ', 'betaX', 'betaZ'])
sls1 = synchrotron(
eE = 2.4,
eI = 0.4,
eEspread=0.878e-3,
eEpsilonX=5.63,
eEpsilonZ=0.007,
betaX=0.45,
betaZ=14.4,
)
sls2 = synchrotron(
eE=2.7,
eI=0.4,
eEspread=1.147e-3,
eEpsilonX=0.156,
eEpsilonZ=0.01,
betaX=0.18,
betaZ=4.6,
)
# Source
bendingMagnet = namedtuple('bendingMagnet', ['name', 'center', 'sync', 'B0'])
sls1_29t = bendingMagnet(
name='FE-BM-SLS1-2.9T',
center=(0, 0, 0),
sync=sls1,
B0=2.9,)
sls2_21t = bendingMagnet(
name='FE-BM-SLS2-2.1T',
center=(0, 0, 0),
sync=sls1,
B0=2.1,)
# FE slits
slits = namedtuple('slits', ['name', 'center', 'maxDivH', 'maxDivV'])
feSlits = slits(
name='FE-SLITS',
center=(0, 5290, sourceHeight),
maxDivH=1.8e-3,
maxDivV=0.8e-3,)
# Filters
filt = namedtuple('filt', ['name', 'center', 'pitch', 'limPhysX', 'limPhysY', 'surface', 'material', 'thickness'])
feWindow = filt(
name='FE-WINDOW',
center=(0., 6158, sourceHeight),
pitch=np.pi/2,
limPhysX=(-6, 6),
limPhysY=(-3., 3.),
surface='None',
material=filterDiamond,
thickness=0.1,)
feWindow = feWindow._replace(surface='CVD Diamond window {0:0.0f} $\mu$m'.format(feWindow.thickness*1e3))
feFilt = filt(
name='FE-FI',
center=(0., 6590, sourceHeight),
pitch=np.pi/2,
limPhysX=(-15, 15),
limPhysY=(-10, 10),
surface='None',
material=filterGraphite,
thickness=0.25,)
feFilt = feFilt._replace(surface='Graphite filter {0:0.0f} $\mu$m'.format(feFilt.thickness*1e3))
# Collimating mirror
collimatingMirror = namedtuple('collimatingMirror', ['name',
'center', 'surface', 'material', 'limPhysX', 'limPhysY',
'limOptX', 'limOptY', 'R', 'pitch', 'jack1', 'jack2', 'jack3',
'tx1', 'tx2'])
cm = collimatingMirror(
name='FE-CM',
center=[0, 7618, sourceHeight],
surface=('Rh','Si','Pt'),
material=(stripeRh, stripeSi, stripePt),
limPhysX=(-30, 30),
limPhysY=(-600, 600),
limOptX=((11, -2, -21), (21, 8, -5)),
limOptY=((-500, -500, -500), (500, 500, 500)),
R=[3e6, 15e6],
pitch=[1.4e-3, 4.5e-3],
jack1=[0., 7210., 0.], #Tripod X, Y, Z (global)
jack2=[-210., 8310., 0.],
jack3=[210., 8310., 0.],
tx1=[0.0, -575.5], # X-Stage 1 [x, y] (local)
tx2=[0.0, 575],) # X-Stage 2
apertures = namedtuple('apertures', ['name', 'center', 'opening'])
fePS = apertures(
name='FE-PS',
center=[0, 8760, sourceHeight],
opening=[-39/2, 39/2, -10, 29]) # left, right, bottom, top
opWbBsBlock = apertures(
name='OP-WB-BS-BLOCK',
center=[0., 13606-135, sourceHeight],
opening=[-18., 18., 42, 76]) # left, right, bottom, top
opSlits = apertures(
name='OP-SLITS',
center=[0, 14145-135, sourceHeight],
opening=[-35/2, 35/2, 47.5, 82.5])
# Monochromator
monochromator = namedtuple('monochromator', ['name', 'center',
'xtal', 'material1', 'material2', 'xtalWidth', 'xtalOffsetX',
'xtalLength1', 'xtalLength2', 'xtalGap', 'rotOffset',
'heightOffset', 'braggLim', 'jack1', 'jack2', 'jack3', 'tx'])
mo1 = monochromator(
name='OP-CCM1',
center=[0., 11670-135, sourceHeight],
xtal=('Si311','Si111'),
material1=(si311_1, si111_1),
material2=(si311_2, si111_2),
xtalWidth = (20, 20),
xtalOffsetX=(-19.2, 19.2),
xtalLength1 = (60, 60),
xtalLength2 = (60, 60),
xtalGap = (8, 8),
rotOffset = 6, # not sure what it is
heightOffset = 8.5, # not sure what it is
braggLim = [4, 35],
jack1=[0., 11350., 0.], #Tripod not available!
jack2=[-400., 12350., 0.],
jack3=[400., 12350., 0.],
tx=0.0,) # X-Stage [x]
# Focusing mirror
focusingMirror = namedtuple('focusingMirror', ['name', 'center',
'surfaceToroid', 'materialToroid',
'limPhysXToroid', 'limPhysYToroid',
'limOptXToroid', 'limOptYToroid',
'R', 'pitch', 'r', 'xToroid', 'hToroid', 'jack1', 'jack2', 'jack3',
'tx1', 'tx2'])
fm = focusingMirror(
name='OP-FM',
center=[0., 15580-135, sourceHeight],
surfaceToroid=('Rh', 'Pt'),
materialToroid=(stripeRh, stripePt),
limPhysXToroid=(-54., 54.),
limPhysYToroid=(-565., 565.),
limOptXToroid=((90.25, 41.75), (51.75, 5.75)), # With old VME axis, no absolute value!
limOptYToroid=((-500., -500.), (500., 500.)),
R=[3e6, 15e6],
pitch=[1.4e-3, 4.5e-3],
r=[30, 20],
xToroid=[24.126, -22,874], # offset in local x
hToroid=[7., 11.3], # depth of the cylinder at x = xCylinder1 and x = xCylinder2.
jack1=[0., 14980., 0.],
jack2=[-75., 16180., 0.],
jack3=[75., 16180., 0.],
tx1=[0., -575.], # X-Stage 1 [x, y]
tx2=[0., 575.],) # X-Stage 2 [x, y]
ehWindow = filt(
name='EH-WINDOW',
center=(0., 22225-135, sourceHeight),
pitch=np.pi/2,
limPhysX=(-10., 10.),
limPhysY=(17.5, 92.5),
surface='None',
material=filterBe,
thickness=0.25,)
ehWindow = ehWindow._replace(surface='Beryllium window {0:0.0f} $\mu$m'.format(ehWindow.thickness*1e3))
# Sample
sample = namedtuple('sample', ['name', 'center'])
smpl = sample(
name='OP-SMPL',
center=[0, 24000-135, sourceHeight],)
@@ -1,4 +1,3 @@
#######################################
## Beam Monitors 2 and 3 -- Virtual positioners
@@ -0,0 +1,243 @@
###################################
## Frontend Absorber ##
###################################
abs:
readoutPriority: baseline
description: Frontend Absorber
deviceClass: superxas_bec.devices.absorber.Absorber
deviceConfig:
prefix: "X10DA-FE-ABS1:"
onFailure: retry
enabled: true
softwareTrigger: false
###################################
## Frontend Slits ##
###################################
sldi_trxr:
readoutPriority: baseline
description: Front-end slit diaphragm X-translation Ring-edge
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-FE-SLDI:TRXR
onFailure: retry
enabled: true
softwareTrigger: false
sldi_trxw:
readoutPriority: baseline
description: Front-end slit diaphragm X-translation Wall-edge
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-FE-SLDI:TRXW
onFailure: retry
enabled: true
softwareTrigger: false
sldi_tryb:
readoutPriority: baseline
description: Front-end slit diaphragm Y-translation Bottom-edge
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-FE-SLDI:TRYB
onFailure: retry
enabled: true
softwareTrigger: false
sldi_tryt:
readoutPriority: baseline
description: Front-end slit diaphragm X-translation Top-edge
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-FE-SLDI:TRYT
onFailure: retry
enabled: true
softwareTrigger: false
sldi_centerx:
readoutPriority: baseline
description: Front-end slit diaphragm X-center
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-FE-SLDI:CENTERX
onFailure: retry
enabled: true
softwareTrigger: false
sldi_gapx:
readoutPriority: baseline
description: Front-end slit diaphragm X-gap
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-FE-SLDI:GAPX
onFailure: retry
enabled: true
softwareTrigger: false
sldi_centery:
readoutPriority: baseline
description: Front-end slit diaphragm Y-center
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-FE-SLDI:CENTERY
onFailure: retry
enabled: true
softwareTrigger: false
sldi_gapy:
readoutPriority: baseline
description: Front-end slit diaphragm Y-gap
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-FE-SLDI:GAPY
onFailure: retry
enabled: true
softwareTrigger: false
###################################
## Collimating Mirror ##
###################################
cm_trxu:
readoutPriority: baseline
description: Collimating Mirror X-translation upstream
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-FE-CM:TRXU
onFailure: retry
enabled: true
softwareTrigger: false
cm_trxd:
readoutPriority: baseline
description: Collimating Mirror X-translation downstream
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-FE-CM:TRXD
onFailure: retry
enabled: true
softwareTrigger: false
cm_tryu:
readoutPriority: baseline
description: Collimating Mirror Y-translation upstream
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-FE-CM:TRYU
onFailure: retry
enabled: true
softwareTrigger: false
cm_trydr:
readoutPriority: baseline
description: Collimating Mirror Y-translation downstream ring
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-FE-CM:TRYDR
onFailure: retry
enabled: true
softwareTrigger: false
cm_trydw:
readoutPriority: baseline
description: Collimating Mirror Y-translation downstream wall
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-FE-CM:TRYDW
onFailure: retry
enabled: true
softwareTrigger: false
cm_bnd:
readoutPriority: baseline
description: Collimating Mirror bender
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-FE-CM:BND
onFailure: retry
enabled: true
softwareTrigger: false
# cm_bnd_radius:
# readoutPriority: baseline
# description: Collimating Mirror Bending Radius
# deviceClass: ophyd.EpicsSignalRO
# deviceConfig:
# read_pv: X10DA-CPCL-CM:BNDFORCE
# onFailure: retry
# readOnly: true
# enabled: true
# softwareTrigger: false
cm_rotx:
readoutPriority: baseline
description: Collimating Morror Pitch
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-FE-CM:ROTX
onFailure: retry
enabled: true
softwareTrigger: false
cm_roty:
readoutPriority: baseline
description: Collimating Morror Yaw
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-FE-CM:ROTY
onFailure: retry
enabled: true
softwareTrigger: false
cm_rotz:
readoutPriority: baseline
description: Collimating Morror Roll
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-FE-CM:ROTZ
onFailure: retry
enabled: true
softwareTrigger: false
cm_trx:
readoutPriority: baseline
description: Collimating Morror Center Point X
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-FE-CM:XTCP
onFailure: retry
enabled: true
softwareTrigger: false
cm_try:
readoutPriority: baseline
description: Collimating Morror Center Point Y
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-FE-CM:YTCP
onFailure: retry
enabled: true
softwareTrigger: false
cm_ztcp:
readoutPriority: baseline
description: Collimating Morror Center Point Z
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-FE-CM:ZTCP
onFailure: retry
enabled: true
softwareTrigger: false
cm_xstripe:
readoutPriority: baseline
description: Collimating Morror X Stripe
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-FE-CM:XSTRIPE
onFailure: retry
enabled: true
softwareTrigger: false
@@ -0,0 +1,18 @@
###################################
## SLS Machine ##
###################################
curr:
readoutPriority: baseline
description: SLS ring current
deviceClass: ophyd.EpicsSignalRO
deviceConfig:
auto_monitor: true
read_pv: AGEBD-DBPM3CURR:CURRENT-AVG
deviceTags:
- machine
onFailure: buffer
enabled: true
readOnly: true
softwareTrigger: false
@@ -0,0 +1,396 @@
###################################
## Monochromator ##
###################################
mo1_try:
readoutPriority: baseline
description: Monochromator Y Translation
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-OP1-MO1:TRY
onFailure: retry
enabled: true
softwareTrigger: false
mo1_trx:
readoutPriority: baseline
description: Monochromator X Translation
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-OP1-MO1:TRX
onFailure: retry
enabled: true
softwareTrigger: false
###################################
## Optics Slits + Beam Monitor 1 ##
###################################
# sl1_trxr:
# readoutPriority: baseline
# description: Optics slits 1 X-translation Ring-edge
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X10DA-OP-SL1:TRXR
# onFailure: retry
# enabled: true
# softwareTrigger: false
# deviceTags:
# - optics
# - slits
# sl1_trxw:
# readoutPriority: baseline
# description: Optics slits 1 X-translation Wall-edge
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X10DA-OP-SL1:TRXW
# onFailure: retry
# enabled: true
# softwareTrigger: false
# deviceTags:
# - optics
# - slits
# sl1_tryb:
# readoutPriority: baseline
# description: Optics slits 1 Y-translation Bottom-edge
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X10DA-OP-SL1:TRYB
# onFailure: retry
# enabled: true
# softwareTrigger: false
# deviceTags:
# - optics
# - slits
# sl1_tryt:
# readoutPriority: baseline
# description: Optics slits 1 X-translation Top-edge
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X10DA-OP-SL1:TRYT
# onFailure: retry
# enabled: true
# softwareTrigger: false
# deviceTags:
# - optics
# - slits
# bm1_try:
# readoutPriority: baseline
# description: Beam Monitor 1 Y-translation
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X10DA-OP-BM1:TRY
# onFailure: retry
# enabled: true
# softwareTrigger: false
# deviceTags:
# - optics
# - slits
# sl1_centerx:
# readoutPriority: baseline
# description: Optics slits 1 X-center
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X10DA-OP-SL1:CENTERX
# onFailure: retry
# enabled: true
# softwareTrigger: false
# deviceTags:
# - optics
# - slits
# sl1_gapx:
# readoutPriority: baseline
# description: Optics slits 1 X-gap
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X10DA-OP-SL1:GAPX
# onFailure: retry
# enabled: true
# softwareTrigger: false
# deviceTags:
# - optics
# - slits
# sl1_centery:
# readoutPriority: baseline
# description: Optics slits 1 Y-center
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X10DA-OP-SL1:CENTERY
# onFailure: retry
# enabled: true
# softwareTrigger: false
# deviceTags:
# - optics
# - slits
# sl1_gapy:
# readoutPriority: baseline
# description: Optics slits 1 Y-gap
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X10DA-OP-SL1:GAPY
# onFailure: retry
# enabled: true
# softwareTrigger: false
# deviceTags:
# - optics
# - slits
###################################
## Focusing Mirror ##
###################################
# fm_trxu:
# readoutPriority: baseline
# description: Focusing Mirror X-translation upstream
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X10DA-OP-FM:TRXU
# onFailure: retry
# enabled: true
# softwareTrigger: false
# fm_trxd:
# readoutPriority: baseline
# description: Focusing Mirror X-translation downstream
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X10DA-OP-FM:TRXD
# onFailure: retry
# enabled: true
# softwareTrigger: false
# fm_tryd:
# readoutPriority: baseline
# description: Focusing Mirror Y-translation downstream
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X10DA-OP-FM:TRYD
# onFailure: retry
# enabled: true
# softwareTrigger: false
# fm_tryur:
# readoutPriority: baseline
# description: Focusing Mirror Y-translation upstream ring
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X10DA-OP-FM:TRYUR
# onFailure: retry
# enabled: true
# softwareTrigger: false
# fm_tryuw:
# readoutPriority: baseline
# description: Focusing Mirror Y-translation upstream wall
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X10DA-OP-FM:TRYUW
# onFailure: retry
# enabled: true
# softwareTrigger: false
fm_bnd:
readoutPriority: baseline
description: Focusing Mirror bender
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-OP-MI1:TRB
onFailure: retry
enabled: true
softwareTrigger: false
# fm_bnd_radius:
# readoutPriority: baseline
# description: Focusing Mirror Bending Radius
# deviceClass: ophyd.EpicsSignalRO
# deviceConfig:
# read_pv: X10DA-CPCL-FM:BNDFORCE
# onFailure: retry
# readOnly: true
# enabled: true
# softwareTrigger: false
fm_rotx:
readoutPriority: baseline
description: Focusing Morror Pitch
deviceClass: ophyd.EpicsSignalRO
deviceConfig:
auto_monitor: true
read_pv: X10DA-OP-MI1:pitch
onFailure: retry
enabled: true
softwareTrigger: false
fm_roty:
readoutPriority: baseline
description: Focusing Morror Yaw
deviceClass: ophyd.EpicsSignalRO
deviceConfig:
auto_monitor: true
read_pv: X10DA-OP-MI1:yaw
onFailure: retry
enabled: true
softwareTrigger: false
fm_rotz:
readoutPriority: baseline
description: Focusing Morror Roll
deviceClass: ophyd.EpicsSignalRO
deviceConfig:
auto_monitor: true
read_pv: X10DA-OP-MI1:roll
onFailure: retry
enabled: true
softwareTrigger: false
fm_trx:
readoutPriority: baseline
description: Focusing Morror Center Point X
deviceClass: ophyd.EpicsSignalRO
deviceConfig:
auto_monitor: true
read_pv: X10DA-OP-MI1:trans
onFailure: retry
enabled: true
softwareTrigger: false
fm_try:
readoutPriority: baseline
description: Focusing Morror Center Point Y
deviceClass: ophyd.EpicsSignalRO
deviceConfig:
auto_monitor: true
read_pv: X10DA-OP-MI1:y
onFailure: retry
enabled: true
softwareTrigger: false
###################################
## Optics Slits + Beam Monitor 2 ##
###################################
# sl2_trxr:
# readoutPriority: baseline
# description: Optics slits 2 X-translation Ring-edge
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X10DA-OP-SL2:TRXR
# onFailure: retry
# enabled: true
# softwareTrigger: false
# deviceTags:
# - optics
# - slits
# sl2_trxw:
# readoutPriority: baseline
# description: Optics slits 2 X-translation Wall-edge
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X10DA-OP-SL2:TRXW
# onFailure: retry
# enabled: true
# softwareTrigger: false
# deviceTags:
# - optics
# - slits
# sl2_tryb:
# readoutPriority: baseline
# description: Optics slits 2 Y-translation Bottom-edge
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X10DA-OP-SL2:TRYB
# onFailure: retry
# enabled: true
# softwareTrigger: false
# deviceTags:
# - optics
# - slits
# sl2_tryt:
# readoutPriority: baseline
# description: Optics slits 2 X-translation Top-edge
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X10DA-OP-SL2:TRYT
# onFailure: retry
# enabled: true
# softwareTrigger: false
# deviceTags:
# - optics
# - slits
# bm2_try:
# readoutPriority: baseline
# description: Beam Monitor 2 Y-translation
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X10DA-OP-BM2:TRY
# onFailure: retry
# enabled: true
# softwareTrigger: false
# deviceTags:
# - optics
# - slits
# sl2_centerx:
# readoutPriority: baseline
# description: Optics slits 2 X-center
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X10DA-OP-SL2:CENTERX
# onFailure: retry
# enabled: true
# softwareTrigger: false
# deviceTags:
# - optics
# - slits
# sl2_gapx:
# readoutPriority: baseline
# description: Optics slits 2 X-gap
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X10DA-OP-SL2:GAPX
# onFailure: retry
# enabled: true
# softwareTrigger: false
# deviceTags:
# - optics
# - slits
# sl2_centery:
# readoutPriority: baseline
# description: Optics slits 2 Y-center
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X10DA-OP-SL2:CENTERY
# onFailure: retry
# enabled: true
# softwareTrigger: false
# deviceTags:
# - optics
# - slits
# sl2_gapy:
# readoutPriority: baseline
# description: Optics slits 2 Y-gap
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X10DA-OP-SL2:GAPY
# onFailure: retry
# enabled: true
# softwareTrigger: false
# deviceTags:
# - optics
# - slits
@@ -0,0 +1,89 @@
###################################
## General ##
###################################
## SLS Machine
machine_config:
- !include ./x10da_machine.yaml
## Beam Monitors OP + EH
# beam_monitors_config:
# - !include ./x10da_beam_monitors.yaml
###################################
## Frontend ##
###################################
## Frontend
frontend_config:
- !include ./x10da_frontend.yaml
###################################
## Optics Hutch ##
###################################
## Bragg Monochromator
mo1_gonio:
readoutPriority: baseline
description: Monochromator ROTX Goniometer
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-OP1-MO1:ROTX
onFailure: retry
enabled: true
softwareTrigger: false
mo1_bragg:
readoutPriority: baseline
description: Positioner for the Monochromator
deviceClass: superxas_bec.devices.mo1_bragg.mo1_bragg.Mo1Bragg
deviceConfig:
prefix: "X10DA-OP-MO1:BRAGG:"
onFailure: retry
enabled: true
softwareTrigger: false
# mo1_bragg_angle:
# readoutPriority: baseline
# description: Positioner for the Monochromator
# deviceClass: superxas_bec.devices.mo1_bragg.mo1_bragg_angle.Mo1BraggAngle
# deviceConfig:
# prefix: "X10DA-OP-MO1:BRAGG:"
# onFailure: retry
# enabled: true
# softwareTrigger: false
# Remaining optics hutch
optics_config:
- !include ./x10da_optics.yaml
###################################
## Experimental Hutch ##
###################################
# ## NIDAQ
nidaq:
readoutPriority: monitored
description: NIDAQ backend for data reading for superxas scans
deviceClass: superxas_bec.devices.nidaq.nidaq.Nidaq
deviceConfig:
prefix: "X10DA-CPCL-SCANSERVER:"
onFailure: retry
enabled: true
softwareTrigger: false
## XAS (ICx, SDD, ref foils)
# xas_config:
# - !include ./x10da_xas.yaml
## XRD (Pilatus, pinhole, beamstop)
#xrd_config:
# - !include ./x10da_xrd.yaml
# Commented out because too slow
## Hutch cameras
# hutch_cams:
# - !include ./x10da_hutch_cameras.yaml
## Remaining experimental hutch
# es_config:
# - !include ./x10da_experimental_hutch.yaml
+72
View File
@@ -0,0 +1,72 @@
"""Frontend Absorber"""
from __future__ import annotations
import enum
from typing import TYPE_CHECKING
from ophyd import Component as Cpt
from ophyd import EpicsSignal, EpicsSignalRO
from ophyd_devices import CompareStatus, DeviceStatus
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
if TYPE_CHECKING:
from bec_lib.devicemanager import ScanInfo
class AbsorberError(Exception):
"""Absorber specific exception"""
class STATUS(int, enum.Enum):
"""Absorber States"""
MOVING_CLOSE = 0
OPEN = 1
MOVING_OPEN = 2
CLOSED = 3
NOT_ENABLED = 4
TIMEOUT_CLOSE = 5
TIMEOUT_OPEN = 6
CLOSE_LS_LOST = 7
OPEN_LS_LOST = 8
CLOSE_LS_NOT_FREE = 9
OPEN_LS_NOT_FREE = 10
ERROR_LS = 11
TO_CONNECT = 12
MAN_OPEN = 13
UNDEFINED = 14
class Absorber(PSIDeviceBase):
"""Class for the Frontend Absorber"""
USER_ACCESS = ["open", "close"]
request = Cpt(EpicsSignal, suffix="REQUEST", kind="config", doc="Open/Close Absorber")
status = Cpt(EpicsSignalRO, suffix="STATUS", kind="config", doc="Absorber Status")
status_string = Cpt(EpicsSignalRO, suffix="STATUS", kind="config", string=True, doc="Absorber Status")
def __init__(self, *, name: str, prefix: str = "", scan_info: ScanInfo | None = None, **kwargs):
super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs)
self.timeout_for_move = 10
# Wait for connection on all components, ensure IOC is connected
self.wait_for_connection(all_signals=True, timeout=5)
def open(self) -> DeviceStatus | None:
"""Open the Absorber"""
if self.status.get() == STATUS.CLOSED:
self.request.put(1)
status_open = CompareStatus(self.status, STATUS.OPEN, timeout=self.timeout_for_move)
status = status_open
return status
else:
return None
def close(self) -> DeviceStatus | None:
"""Close the Absorber"""
if self.status.get() == STATUS.OPEN:
self.request.put(1)
status_close = CompareStatus(self.status, STATUS.CLOSED, timeout=self.timeout_for_move)
status = status_close
return status
else:
return None
+466
View File
@@ -0,0 +1,466 @@
"""Module for the Mo1 Bragg positioner of the SuperXAS beamline.
The softIOC is reachable via the EPICS prefix X10DA-OP-MO1:BRAGG: and connected
to a motor controller via web sockets. The Mo1 Bragg positioner is a scan controller
to setup XAS scans. A few scan modes are programmed in the controller, e.g. simple and advanced XAS scans.
Note: For some of the Epics PVs, in particular action buttons, the put_complete=True is
used to ensure that the action is executed completely. This is believed
to allow for a more stable execution of the action."""
import time
from typing import Any, Literal
from bec_lib.devicemanager import ScanInfo
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd import DeviceStatus, Signal, StatusBase
from ophyd.status import SubscriptionStatus, WaitTimeoutError
from ophyd_devices import CompareStatus, ProgressSignal, TransitionStatus
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from ophyd_devices.utils.errors import DeviceStopError
from pydantic import BaseModel, Field
from typeguard import typechecked
from superxas_bec.devices.mo1_bragg.mo1_bragg_devices import Mo1BraggPositioner
# pylint: disable=unused-import
from superxas_bec.devices.mo1_bragg.mo1_bragg_enums import (
MoveType,
ScanControlLoadMessage,
ScanControlMode,
ScanControlScanStatus,
TriggerControlMode,
TriggerControlSource,
)
from superxas_bec.devices.mo1_bragg.mo1_bragg_utils import compute_spline
# Initialise logger
logger = bec_logger.logger
########### Exceptions ###########
class Mo1BraggError(Exception):
"""Exception for the Mo1 Bragg positioner"""
########## Scan Parameter Model ##########
class ScanParameter(BaseModel):
"""Dataclass to store the scan parameters for the Mo1 Bragg positioner.
This needs to be in sync with the kwargs of the MO1 Bragg scans from SuperXAS, to
ensure that the scan parameters are correctly set. Any changes in the scan kwargs,
i.e. renaming or adding new parameters, need to be represented here as well."""
scan_time: float | None = Field(None, description="Scan time for a half oscillation")
scan_duration: float | None = Field(None, description="Duration of the scan")
xrd_enable_low: bool | None = Field(
None, description="XRD enabled for low, should be PV trig_ena_lo_enum"
) # trig_enable_low: bool = None
xrd_enable_high: bool | None = Field(
None, description="XRD enabled for high, should be PV trig_ena_hi_enum"
) # trig_enable_high: bool = None
exp_time_low: float | None = Field(None, description="Exposure time low energy/angle")
exp_time_high: float | None = Field(None, description="Exposure time high energy/angle")
cycle_low: int | None = Field(None, description="Cycle for low energy/angle")
cycle_high: int | None = Field(None, description="Cycle for high energy/angle")
start: float | None = Field(None, description="Start value for energy/angle")
stop: float | None = Field(None, description="Stop value for energy/angle")
p_kink: float | None = Field(None, description="P Kink")
e_kink: float | None = Field(None, description="Energy Kink")
model_config: dict = {"validate_assignment": True}
########### Mo1 Bragg Motor Class ###########
class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
"""Mo1 Bragg motor for the SuperXAS beamline.
The prefix to connect to the soft IOC is X10DA-OP-MO1:BRAGG:
"""
progress_signal = Cpt(ProgressSignal, name="progress_signal")
USER_ACCESS = ["set_advanced_xas_settings", "set_xtal"]
def __init__(self, name: str, prefix: str = "", scan_info: ScanInfo | None = None, **kwargs): # type: ignore
"""
Initialize the PSI Device Base class.
Args:
name (str) : Name of the device
scan_info (ScanInfo): The scan info to use.
"""
super().__init__(name=name, scan_info=scan_info, prefix=prefix, **kwargs)
self.scan_parameter = ScanParameter()
self.timeout_for_pvwait = 7.5
########################################
# Beamline Specific Implementations #
########################################
def on_init(self) -> None:
"""
Called when the device is initialized.
No signals are connected at this point. If you like to
set default values on signals, please use on_connected instead.
"""
def on_connected(self) -> None:
"""
Called after the device is connected and its signals are connected.
Default values for signals should be set here.
"""
self.scan_control.scan_progress.subscribe(self._progress_update, run=False)
def on_stage(self) -> DeviceStatus | StatusBase | None:
"""
Called while staging the device.
Information about the upcoming scan can be accessed from the scan_info (self.scan_info.msg) object.
"""
if self.scan_control.scan_msg.get() != ScanControlLoadMessage.PENDING:
status = CompareStatus(self.scan_control.scan_msg, ScanControlLoadMessage.PENDING)
self.cancel_on_stop(status)
self.scan_control.scan_val_reset.put(1)
status.wait(timeout=self.timeout_for_pvwait)
scan_name = self.scan_info.msg.scan_name
self._update_scan_parameter()
if scan_name == "xas_simple_scan":
self.set_xas_settings(
low=self.scan_parameter.start,
high=self.scan_parameter.stop,
scan_time=self.scan_parameter.scan_time,
)
self.set_trig_settings(
enable_low=False,
enable_high=False,
exp_time_low=0,
exp_time_high=0,
cycle_low=0,
cycle_high=0,
)
self.set_scan_control_settings(
mode=ScanControlMode.SIMPLE, scan_duration=self.scan_parameter.scan_duration
)
elif scan_name == "xas_simple_scan_with_xrd":
self.set_xas_settings(
low=self.scan_parameter.start,
high=self.scan_parameter.stop,
scan_time=self.scan_parameter.scan_time,
)
self.set_trig_settings(
enable_low=self.scan_parameter.xrd_enable_low, # enable_low=self.scan_parameter.trig_enable_low,
enable_high=self.scan_parameter.xrd_enable_high, # enable_high=self.scan_parameter.trig_enable_high,
exp_time_low=self.scan_parameter.exp_time_low,
exp_time_high=self.scan_parameter.exp_time_high,
cycle_low=self.scan_parameter.cycle_low,
cycle_high=self.scan_parameter.cycle_high,
)
self.set_scan_control_settings(
mode=ScanControlMode.SIMPLE, scan_duration=self.scan_parameter.scan_duration
)
elif scan_name == "xas_advanced_scan":
self.set_advanced_xas_settings(
low=self.scan_parameter.start,
high=self.scan_parameter.stop,
scan_time=self.scan_parameter.scan_time,
p_kink=self.scan_parameter.p_kink,
e_kink=self.scan_parameter.e_kink,
)
self.set_trig_settings(
enable_low=False,
enable_high=False,
exp_time_low=0,
exp_time_high=0,
cycle_low=0,
cycle_high=0,
)
self.set_scan_control_settings(
mode=ScanControlMode.ADVANCED, scan_duration=self.scan_parameter.scan_duration
)
elif scan_name == "xas_advanced_scan_with_xrd":
self.set_advanced_xas_settings(
low=self.scan_parameter.start,
high=self.scan_parameter.stop,
scan_time=self.scan_parameter.scan_time,
p_kink=self.scan_parameter.p_kink,
e_kink=self.scan_parameter.e_kink,
)
self.set_trig_settings(
enable_low=self.scan_parameter.xrd_enable_low, # enable_low=self.scan_parameter.trig_enable_low,
enable_high=self.scan_parameter.xrd_enable_high, # enable_high=self.scan_parameter.trig_enable_high,
exp_time_low=self.scan_parameter.exp_time_low,
exp_time_high=self.scan_parameter.exp_time_high,
cycle_low=self.scan_parameter.cycle_low,
cycle_high=self.scan_parameter.cycle_high,
)
self.set_scan_control_settings(
mode=ScanControlMode.ADVANCED, scan_duration=self.scan_parameter.scan_duration
)
else:
return
# Setting scan duration seems to lag behind slightly in the backend, include small sleep
# logger.info(f"Sleeping for one second")
# time.sleep(1)
# logger.info(f"Device {self.name}, done sleeping")
# Load the scan parameters to the controller
status = CompareStatus(self.scan_control.scan_msg, ScanControlLoadMessage.SUCCESS)
self.cancel_on_stop(status)
self.scan_control.scan_load.put(1)
# Wait for params to be checked from controller
status.wait(self.timeout_for_pvwait)
return None
def on_unstage(self) -> DeviceStatus | StatusBase | None:
"""Called while unstaging the device."""
if self.stopped is True:
logger.warning(f"Resetting stopped in unstage for device {self.name}.")
self._stopped = False
if self.scan_control.scan_msg.get() in [
ScanControlLoadMessage.STARTED,
ScanControlLoadMessage.SUCCESS,
]:
status = CompareStatus(self.scan_control.scan_msg, ScanControlLoadMessage.PENDING)
self.cancel_on_stop(status)
try:
status.wait(2)
return None
except WaitTimeoutError:
logger.warning(
f"Timeout in on_unstage of {self.name} after {self.timeout_for_pvwait}s, current scan_control_message : {self.scan_control.scan_msg.get()}"
)
status = CompareStatus(self.scan_control.scan_msg, ScanControlLoadMessage.PENDING)
self.cancel_on_stop(status)
self.scan_control.scan_val_reset.put(1)
status.wait(timeout=self.timeout_for_pvwait)
else:
status = CompareStatus(self.scan_control.scan_msg, ScanControlLoadMessage.PENDING)
self.cancel_on_stop(status)
self.scan_control.scan_val_reset.put(1)
status.wait(timeout=self.timeout_for_pvwait)
return None
def on_pre_scan(self) -> DeviceStatus | StatusBase | None:
"""Called right before the scan starts on all devices automatically."""
def on_trigger(self) -> DeviceStatus | StatusBase | None:
"""Called when the device is triggered."""
def on_complete(self) -> DeviceStatus | StatusBase | None:
"""Called to inquire if a device has completed a scans."""
status = CompareStatus(self.scan_control.scan_done, 1)
self.cancel_on_stop(status)
return status
def on_kickoff(self) -> DeviceStatus | StatusBase | None:
"""Called to kickoff a device for a fly scan. Has to be called explicitly."""
scan_duration = self.scan_control.scan_duration.get()
# TODO implement better logic for infinite scans, at least bring it up with SuperXAS
start_func = (
self.scan_control.scan_start_infinite.put
if scan_duration < 0.1
else self.scan_control.scan_start_timer.put
)
status = TransitionStatus(
self.scan_control.scan_status,
transitions=[ScanControlScanStatus.READY, ScanControlScanStatus.RUNNING],
strict=True,
failure_states=[ScanControlScanStatus.PARAMETER_WRONG],
)
self.cancel_on_stop(status)
start_func(1)
return status
def on_stop(self) -> None:
"""Called when the device is stopped."""
self.stopped = True # Needs to be set to stop motion
######### Utility Methods #########
def _progress_update(self, value, **kwargs) -> None:
"""Callback method to update the scan progress, runs a callback
to SUB_PROGRESS subscribers, i.e. BEC.
Args:
value (int) : current progress value
"""
max_value = 100
self.progress_signal.put(value=value, max_value=max_value, done=bool(max_value == value))
def set_xas_settings(self, low: float, high: float, scan_time: float) -> None:
"""Set XAS parameters for upcoming scan.
Args:
low (float): Low energy/angle value of the scan
high (float): High energy/angle value of the scan
scan_time (float): Time for a half oscillation
"""
status_list = []
status_list.append(self.scan_settings.s_scan_energy_lo.set(low))
self.cancel_on_stop(status_list[-1])
status_list.append(self.scan_settings.s_scan_energy_hi.set(high))
self.cancel_on_stop(status_list[-1])
status_list.append(self.scan_settings.s_scan_scantime.set(scan_time))
self.cancel_on_stop(status_list[-1])
for s in status_list:
s.wait(timeout=self.timeout_for_pvwait)
@typechecked
def convert_angle_energy(
self, mode: Literal["AngleToEnergy", "EnergyToAngle"], inp: float
) -> float:
"""Calculate energy to angle or vice versa
Args:
mode (Literal["AngleToEnergy", "EnergyToAngle"]): Mode of calculation
input (float): Either angle or energy
Returns:
output (float): Converted angle or energy
"""
self.calculator.calc_reset.put(0)
self.calculator.calc_reset.put(1)
status = CompareStatus(self.calculator.calc_done, 0)
self.cancel_on_stop(status)
status.wait(self.timeout_for_pvwait)
if mode == "AngleToEnergy":
self.calculator.calc_angle.put(inp)
elif mode == "EnergyToAngle":
self.calculator.calc_energy.put(inp)
status = CompareStatus(self.calculator.calc_done, 1)
self.cancel_on_stop(status)
status.wait(self.timeout_for_pvwait)
time.sleep(0.25) #TODO needed still? Needed due to update frequency of softIOC
if mode == "AngleToEnergy":
return self.calculator.calc_energy.get()
elif mode == "EnergyToAngle":
return self.calculator.calc_angle.get()
def set_advanced_xas_settings(
self, low: float, high: float, scan_time: float, p_kink: float, e_kink: float
) -> None:
"""Set Advanced XAS parameters for upcoming scan.
Args:
low (float): Low angle value of the scan in eV
high (float): High angle value of the scan in eV
scan_time (float): Time for a half oscillation in s
p_kink (float): Position of kink in %
e_kink (float): Energy of kink in eV
"""
e_kink_deg = self.convert_angle_energy(mode="EnergyToAngle", inp=e_kink)
# Angle and Energy are inverse proportional!
high_deg = self.convert_angle_energy(mode="EnergyToAngle", inp=low)
low_deg = self.convert_angle_energy(mode="EnergyToAngle", inp=high)
pos, vel, dt = compute_spline(
low_deg=low_deg,
high_deg=high_deg,
p_kink=p_kink,
e_kink_deg=e_kink_deg,
scan_time=scan_time,
)
status_list = []
status_list.append(self.scan_settings.a_scan_pos.set(pos))
self.cancel_on_stop(status_list[-1])
status_list.append(self.scan_settings.a_scan_vel.set(vel))
self.cancel_on_stop(status_list[-1])
status_list.append(self.scan_settings.a_scan_time.set(dt))
self.cancel_on_stop(status_list[-1])
for s in status_list:
s.wait(timeout=self.timeout_for_pvwait)
def set_trig_settings(
self,
enable_low: bool,
enable_high: bool,
exp_time_low: int,
exp_time_high: int,
cycle_low: int,
cycle_high: int,
) -> None:
"""Set TRIG settings for the upcoming scan.
Args:
enable_low (bool): Enable TRIG for low energy/angle
enable_high (bool): Enable TRIG for high energy/angle
num_trigger_low (int): Number of triggers for low energy/angle
num_trigger_high (int): Number of triggers for high energy/angle
exp_time_low (int): Exposure time for low energy/angle
exp_time_high (int): Exposure time for high energy/angle
cycle_low (int): Cycle for low energy/angle
cycle_high (int): Cycle for high energy/angle
"""
status_list = []
status_list.append(self.scan_settings.trig_ena_hi_enum.set(int(enable_high)))
self.cancel_on_stop(status_list[-1])
status_list.append(self.scan_settings.trig_ena_lo_enum.set(int(enable_low)))
self.cancel_on_stop(status_list[-1])
status_list.append(self.scan_settings.trig_time_hi.set(exp_time_high))
self.cancel_on_stop(status_list[-1])
status_list.append(self.scan_settings.trig_time_lo.set(exp_time_low))
self.cancel_on_stop(status_list[-1])
status_list.append(self.scan_settings.trig_every_n_hi.set(cycle_high))
self.cancel_on_stop(status_list[-1])
status_list.append(self.scan_settings.trig_every_n_lo.set(cycle_low))
self.cancel_on_stop(status_list[-1])
for s in status_list:
s.wait(timeout=self.timeout_for_pvwait)
def set_scan_control_settings(self, mode: ScanControlMode, scan_duration: float) -> None:
"""Set the scan control settings for the upcoming scan.
Args:
mode (ScanControlMode): Mode for the scan, either simple or advanced
scan_duration (float): Duration of the scan
"""
val = ScanControlMode(mode).value
status_list = []
status_list.append(self.scan_control.scan_mode_enum.set(val))
self.cancel_on_stop(status_list[-1])
status_list.append(self.scan_control.scan_duration.set(scan_duration))
self.cancel_on_stop(status_list[-1])
for s in status_list:
s.wait(timeout=self.timeout_for_pvwait)
def _update_scan_parameter(self):
"""Get the scan_info parameters for the scan."""
for key, value in self.scan_info.msg.request_inputs["inputs"].items():
if hasattr(self.scan_parameter, key):
setattr(self.scan_parameter, key, value)
for key, value in self.scan_info.msg.request_inputs["kwargs"].items():
if hasattr(self.scan_parameter, key):
setattr(self.scan_parameter, key, value)
@@ -0,0 +1,20 @@
"""Positioner implementation with readback angle of the MO1 Bragg positioner."""
from ophyd import Component as Cpt
from ophyd import EpicsSignalRO, EpicsSignalWithRBV
from superxas_bec.devices.mo1_bragg.mo1_bragg_devices import Mo1BraggPositioner
class Mo1BraggAngle(Mo1BraggPositioner):
"""Positioner implementation with readback angle of the MO1 Bragg positioner."""
readback = Cpt(EpicsSignalRO, suffix="feedback_pos_angle_RBV", kind="normal", auto_monitor=True)
setpoint = Cpt(EpicsSignalWithRBV, suffix="set_abs_pos_angle", kind="normal", auto_monitor=True)
low_lim = Cpt(EpicsSignalRO, suffix="lo_lim_pos_angle_RBV", kind="config", auto_monitor=True)
high_lim = Cpt(EpicsSignalRO, suffix="hi_lim_pos_angle_RBV", kind="config", auto_monitor=True)
@property
def egu(self) -> str:
"""Return the engineering unit of the positioner."""
return "deg"
@@ -0,0 +1,407 @@
"""Module for the Mo1 Bragg positioner"""
import threading
import time
import traceback
from typing import Literal
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd import (
Device,
DeviceStatus,
EpicsSignal,
EpicsSignalRO,
EpicsSignalWithRBV,
PositionerBase,
Signal,
)
from ophyd.utils import LimitError
from superxas_bec.devices.mo1_bragg.mo1_bragg_enums import MoveType
# Initialise logger
logger = bec_logger.logger
############# Exceptions #############
class Mo1BraggStoppedError(Exception):
"""Exception to raise when the Bragg positioner is stopped."""
############# Signal classes #############
class MoveTypeSignal(Signal):
"""Custom Signal to set the move type of the Bragg positioner"""
# pylint: disable=arguments-differ
def set(self, value: str | MoveType) -> None:
"""Returns currently active move method
Args:
value (str | MoveType) : Can be either 'energy' or 'angle'
"""
value = MoveType(value.lower())
self._readback = value.value
############# Utility devices to separate the namespace #############
class Mo1BraggStatus(Device):
"""Mo1 Bragg PVs for status monitoring"""
error_status = Cpt(EpicsSignalRO, suffix="error_status_RBV", kind="config", auto_monitor=True)
brake_enabled = Cpt(EpicsSignalRO, suffix="brake_enabled_RBV", kind="config", auto_monitor=True)
mot_commutated = Cpt(
EpicsSignalRO, suffix="mot_commutated_RBV", kind="config", auto_monitor=True
)
axis_enabled = Cpt(EpicsSignalRO, suffix="axis_enabled_RBV", kind="config", auto_monitor=True)
heartbeat = Cpt(EpicsSignalRO, suffix="heartbeat_RBV", kind="config", auto_monitor=True)
class Mo1BraggCrystal(Device):
"""Mo1 Bragg PVs to set the crystal parameters"""
bragg_off_si111 = Cpt(EpicsSignalWithRBV, suffix="bragg_off_si111", kind="config")
bragg_off_si311 = Cpt(EpicsSignalWithRBV, suffix="bragg_off_si311", kind="config")
xtal_enum = Cpt(EpicsSignalWithRBV, suffix="xtal_ENUM", kind="config")
d_spacing_si111 = Cpt(EpicsSignalWithRBV, suffix="d_spacing_si111", kind="config")
d_spacing_si311 = Cpt(EpicsSignalWithRBV, suffix="d_spacing_si311", kind="config")
set_offset = Cpt(EpicsSignal, suffix="set_offset", kind="config", put_complete=True)
current_d_spacing = Cpt(
EpicsSignalRO, suffix="current_d_spacing_RBV", kind="normal", auto_monitor=True
)
current_bragg_off = Cpt(
EpicsSignalRO, suffix="current_bragg_off_RBV", kind="normal", auto_monitor=True
)
current_xtal = Cpt(
EpicsSignalRO, suffix="current_xtal_ENUM_RBV", kind="normal", auto_monitor=True
)
current_xtal_string = Cpt(
EpicsSignalRO, suffix="current_xtal_ENUM_RBV", kind="normal", auto_monitor=True, string=True
)
class Mo1BraggScanSettings(Device):
"""Mo1 Bragg PVs to set the scan setttings"""
# TRIG settings
trig_select_ref_enum = Cpt(EpicsSignalWithRBV, suffix="trig_select_ref_ENUM", kind="config")
trig_ena_hi_enum = Cpt(EpicsSignalWithRBV, suffix="trig_ena_hi_ENUM", kind="config")
trig_time_hi = Cpt(EpicsSignalWithRBV, suffix="trig_time_hi", kind="config")
trig_every_n_hi = Cpt(EpicsSignalWithRBV, suffix="trig_every_n_hi", kind="config")
trig_ena_lo_enum = Cpt(EpicsSignalWithRBV, suffix="trig_ena_lo_ENUM", kind="config")
trig_time_lo = Cpt(EpicsSignalWithRBV, suffix="trig_time_lo", kind="config")
trig_every_n_lo = Cpt(EpicsSignalWithRBV, suffix="trig_every_n_lo", kind="config")
# XAS simple scan settings
s_scan_angle_hi = Cpt(EpicsSignalWithRBV, suffix="s_scan_angle_hi", kind="config")
s_scan_angle_lo = Cpt(EpicsSignalWithRBV, suffix="s_scan_angle_lo", kind="config")
s_scan_energy_lo = Cpt(
EpicsSignalWithRBV, suffix="s_scan_energy_lo", kind="config", auto_monitor=True
)
s_scan_energy_hi = Cpt(
EpicsSignalWithRBV, suffix="s_scan_energy_hi", kind="config", auto_monitor=True
)
s_scan_scantime = Cpt(
EpicsSignalWithRBV, suffix="s_scan_scantime", kind="config", auto_monitor=True
)
# XAS advanced scan settings
a_scan_pos = Cpt(EpicsSignalWithRBV, suffix="a_scan_pos", kind="config", auto_monitor=True)
a_scan_vel = Cpt(EpicsSignalWithRBV, suffix="a_scan_vel", kind="config", auto_monitor=True)
a_scan_time = Cpt(EpicsSignalWithRBV, suffix="a_scan_time", kind="config", auto_monitor=True)
class Mo1TriggerSettings(Device):
"""Mo1 Trigger settings"""
settle_time = Cpt(EpicsSignalWithRBV, suffix="settle_time", kind="config")
max_dev = Cpt(EpicsSignalWithRBV, suffix="max_dev", kind="config")
xrd_trig_src_enum = Cpt(EpicsSignalWithRBV, suffix="xrd_trig_src_ENUM", kind="config")
xrd_trig_mode_enum = Cpt(EpicsSignalWithRBV, suffix="xrd_trig_mode_ENUM", kind="config")
xrd_trig_len = Cpt(EpicsSignalWithRBV, suffix="xrd_trig_len", kind="config")
xrd_trig_req = Cpt(EpicsSignal, suffix="xrd_trig_req", kind="config")
falcon_trig_src_enum = Cpt(EpicsSignalWithRBV, suffix="falcon_trig_src_ENUM", kind="config")
falcon_trig_mode_enum = Cpt(EpicsSignalWithRBV, suffix="falcon_trig_mode_ENUM", kind="config")
falcon_trig_len = Cpt(EpicsSignalWithRBV, suffix="falcon_trig_len", kind="config")
falcon_trig_req = Cpt(EpicsSignal, suffix="falcon_trig_req", kind="config")
univ1_trig_src_enum = Cpt(EpicsSignalWithRBV, suffix="univ1_trig_src_ENUM", kind="config")
univ1_trig_mode_enum = Cpt(EpicsSignalWithRBV, suffix="univ1_trig_mode_ENUM", kind="config")
univ1_trig_len = Cpt(EpicsSignalWithRBV, suffix="univ1_trig_len", kind="config")
univ1_trig_req = Cpt(EpicsSignal, suffix="univ1_trig_req", kind="config")
univ2_trig_src_enum = Cpt(EpicsSignalWithRBV, suffix="univ2_trig_src_ENUM", kind="config")
univ2_trig_mode_enum = Cpt(EpicsSignalWithRBV, suffix="univ2_trig_mode_ENUM", kind="config")
univ2_trig_len = Cpt(EpicsSignalWithRBV, suffix="univ2_trig_len", kind="config")
univ2_trig_req = Cpt(EpicsSignal, suffix="univ2_trig_req", kind="config")
class Mo1BraggCalculator(Device):
"""Mo1 Bragg PVs to convert angle to energy or vice-versa."""
calc_reset = Cpt(EpicsSignal, suffix="calc_reset", kind="config", put_complete=True)
calc_done = Cpt(EpicsSignalRO, suffix="calc_done_RBV", kind="config")
calc_energy = Cpt(EpicsSignalWithRBV, suffix="calc_energy", kind="config")
calc_angle = Cpt(EpicsSignalWithRBV, suffix="calc_angle", kind="config")
class Mo1BraggScanControl(Device):
"""Mo1 Bragg PVs to control the scan after setting the parameters."""
scan_mode_enum = Cpt(EpicsSignalWithRBV, suffix="scan_mode_ENUM", kind="config")
scan_duration = Cpt(
EpicsSignalWithRBV, suffix="scan_duration", kind="config", auto_monitor=True
)
scan_load = Cpt(EpicsSignal, suffix="scan_load", kind="config", put_complete=True)
scan_msg = Cpt(EpicsSignalRO, suffix="scan_msg_ENUM_RBV", kind="config", auto_monitor=True)
scan_start_infinite = Cpt(
EpicsSignal, suffix="scan_start_infinite", kind="config", put_complete=True
)
scan_start_timer = Cpt(EpicsSignal, suffix="scan_start_timer", kind="config", put_complete=True)
scan_stop = Cpt(EpicsSignal, suffix="scan_stop", kind="config", put_complete=True)
scan_status = Cpt(
EpicsSignalRO, suffix="scan_status_ENUM_RBV", kind="config", auto_monitor=True
)
scan_time_left = Cpt(
EpicsSignalRO, suffix="scan_time_left_RBV", kind="config", auto_monitor=True
)
scan_done = Cpt(EpicsSignalRO, suffix="scan_done_RBV", kind="config", auto_monitor=True)
scan_val_reset = Cpt(EpicsSignal, suffix="scan_val_reset", kind="config", put_complete=True)
scan_progress = Cpt(EpicsSignalRO, suffix="scan_progress_RBV", kind="config", auto_monitor=True)
scan_spectra_done = Cpt(
EpicsSignalRO, suffix="scan_n_osc_RBV", kind="config", auto_monitor=True
)
scan_spectra_left = Cpt(
EpicsSignalRO, suffix="scan_n_osc_left_RBV", kind="config", auto_monitor=True
)
class Mo1BraggPositioner(Device, PositionerBase):
"""
Positioner implementation with readback energy of the MO1 Bragg positioner.
The prefix to connect to the soft IOC is X10DA-OP-MO1:BRAGG:
This soft IOC connects to the NI motor and its control loop.
"""
USER_ACCESS = ["set_xtal"]
####### Sub-components ########
# Namespace is cleaner and easier to maintain
crystal = Cpt(Mo1BraggCrystal, "")
scan_settings = Cpt(Mo1BraggScanSettings, "")
trigger_settings = Cpt(Mo1TriggerSettings, "")
calculator = Cpt(Mo1BraggCalculator, "")
scan_control = Cpt(Mo1BraggScanControl, "")
status = Cpt(Mo1BraggStatus, "")
############# Energy PVs #############
readback = Cpt(
EpicsSignalRO, suffix="feedback_pos_energy_RBV", kind="hinted", auto_monitor=True
)
setpoint = Cpt(
EpicsSignalWithRBV, suffix="set_abs_pos_energy", kind="normal", auto_monitor=True
)
motor_is_moving = Cpt(
EpicsSignalRO, suffix="move_abs_done_RBV", kind="normal", auto_monitor=True
)
low_lim = Cpt(EpicsSignalRO, suffix="lo_lim_pos_energy_RBV", kind="config", auto_monitor=True)
high_lim = Cpt(EpicsSignalRO, suffix="hi_lim_pos_energy_RBV", kind="config", auto_monitor=True)
velocity = Cpt(EpicsSignalWithRBV, suffix="move_velocity", kind="config", auto_monitor=True)
########## Move Command PVs ##########
move_abs = Cpt(EpicsSignal, suffix="move_abs", kind="config", put_complete=True)
move_stop = Cpt(EpicsSignal, suffix="move_stop", kind="config", put_complete=True)
SUB_READBACK = "readback"
_default_sub = SUB_READBACK
SUB_PROGRESS = "progress"
def __init__(self, prefix="", *, name: str, **kwargs):
"""Initialize the Mo1 Bragg positioner.
Args:
prefix (str): EPICS prefix for the device
name (str): Name of the device
kwargs: Additional keyword arguments
"""
super().__init__(prefix, name=name, **kwargs)
self._move_thread = None
self._stopped = False
self.readback.name = self.name
def stop(self, *, success=False) -> None:
"""Stop any motion on the positioner
Args:
success (bool) : Flag to indicate if the motion was successful
"""
self.move_stop.put(1)
self._stopped = True
super().stop(success=success)
def stop_scan(self) -> None:
"""Stop the currently running scan gracefully, this finishes the running oscillation."""
self.scan_control.scan_stop.put(1)
@property
def stopped(self) -> bool:
"""Return the status of the positioner"""
return self._stopped
######### Positioner specific methods #########
@property
def limits(self) -> tuple:
"""Return limits of the Bragg positioner"""
return (self.low_lim.get(), self.high_lim.get())
@property
def low_limit(self) -> float:
"""Return low limit of axis"""
return self.limits[0]
@property
def high_limit(self) -> float:
"""Return high limit of axis"""
return self.limits[1]
@property
def egu(self) -> str:
"""Return the engineering units of the positioner"""
return "eV"
@property
def position(self) -> float:
"""Return the current position of Mo1Bragg, considering the move type"""
return self.readback.get()
# pylint: disable=arguments-differ
def check_value(self, value: float) -> None:
"""Method to check if a value is within limits of the positioner.
Called by PositionerBase.move()
Args:
value (float) : value to move axis to.
"""
low_limit, high_limit = self.limits
if low_limit < high_limit and not low_limit <= value <= high_limit:
raise LimitError(f"position={value} not within limits {self.limits}")
def _move_and_finish(
self, target_pos: float, status: DeviceStatus, update_frequency: float = 0.1
) -> None:
"""
Method to be called in the move thread to move the Bragg positioner
to the target position.
Args:
target_pos (float) : target position for the motion
move_cpt (Cpt) : component to set the target position on the IOC,
either setpoint or setpoint_abs_angle depending
on the move type
read_cpt (Cpt) : component to read the current position of the motion,
readback or feedback_pos_angle
status (DeviceStatus) : status object to set the status of the motion
update_frequency (float): Optional, frequency to update the current position of
the motion, defaults to 0.1s
"""
motor_name = None
try:
# Set the target position on IOC
self.setpoint.put(target_pos)
self.move_abs.put(1)
# Currently sleep is needed due to delay in updates on PVs, maybe time can be reduced
time.sleep(0.5)
motor_name = self.name
while self.motor_is_moving.get() == 0:
if self.stopped:
raise Mo1BraggStoppedError(f"Device {self.name} was stopped")
time.sleep(update_frequency)
# pylint: disable=protected-access
status.set_finished()
# pylint: disable=broad-except
except Exception as exc:
content = traceback.format_exc()
logger.error(
f"Error in move thread of device {motor_name if motor_name else ''}: {content}"
)
status.set_exception(exc=exc)
def move(self, value: float, **kwargs) -> DeviceStatus:
"""
Move the Bragg positioner to the specified value, allows to
switch between move types angle and energy.
Args:
value (float) : target value for the motion
move_type (str | MoveType) : Optional, specify the type of move,
either 'energy' or 'angle'
Returns:
DeviceStatus : status object to track the motion
"""
self._stopped = False
self.check_value(value)
status = DeviceStatus(device=self)
self._move_thread = threading.Thread(
target=self._move_and_finish, args=(value, status, 0.1)
)
self._move_thread.start()
return status
# -------------- End of Positioner specific methods -----------------#
# -------------- MO1 Bragg specific methods -----------------#
def set_xtal(
self,
xtal_enum: Literal["111", "311"],
bragg_off_si111: float = None,
bragg_off_si311: float = None,
d_spacing_si111: float = None,
d_spacing_si311: float = None,
) -> None:
"""Method to set the crystal parameters of the Bragg positioner
Args:
xtal_enum (Literal["111", "311"]) : Enum to set the crystal orientation
bragg_off_si111 (float) : Offset for the 111 crystal
bragg_off_si311 (float) : Offset for the 311 crystal
d_spacing_si111 (float) : d-spacing for the 111 crystal
d_spacing_si311 (float) : d-spacing for the 311 crystal
"""
if bragg_off_si111 is not None:
self.crystal.bragg_off_si111.put(bragg_off_si111)
if bragg_off_si311 is not None:
self.crystal.bragg_off_si311.put(bragg_off_si311)
if d_spacing_si111 is not None:
self.crystal.d_spacing_si111.put(d_spacing_si111)
if d_spacing_si311 is not None:
self.crystal.d_spacing_si311.put(d_spacing_si311)
if xtal_enum == "111":
crystal_set = 0
elif xtal_enum == "311":
crystal_set = 1
else:
raise ValueError(
f"Invalid argument for xtal_enum : {xtal_enum}, choose from '111' or '311'"
)
self.crystal.xtal_enum.put(crystal_set)
self.crystal.set_offset.put(1)
@@ -0,0 +1,61 @@
"""Enums for the Bragg positioner and trigger generator"""
import enum
class TriggerControlSource(int, enum.Enum):
"""Enum class for the trigger control source of the trigger generator"""
EPICS = 0
INPOS = 1
class TriggerControlMode(int, enum.Enum):
"""Enum class for the trigger control mode of the trigger generator"""
PULSE = 0
CONDITION = 1
class ScanControlScanStatus(int, enum.Enum):
"""Enum class for the scan status of the Bragg positioner"""
PARAMETER_WRONG = 0
VALIDATION_PENDING = 1
READY = 2
RUNNING = 3
class ScanControlLoadMessage(int, enum.Enum):
"""Enum for validating messages for load message of the Bragg positioner"""
PENDING = 0
STARTED = 1
SUCCESS = 2
ERR_TRIG_MEAS_LEN_LOW = 3
ERR_TRIG_N_TRIGGERS_LOW = 4
ERR_TRIG_TRIGS_EVERY_N_LOW = 5
ERR_TRIG_MEAS_LEN_HI = 6
ERR_TRIG_N_TRIGGERS_HI = 7
ERR_TRIG_TRIGS_EVERY_N_HI = 8
ERR_SCAN_HI_ANGLE_LIMIT = 9
ERR_SCAN_LOW_ANGLE_LIMITS = 10
ERR_SCAN_TIME = 11
ERR_SCAN_VEL_TOO_HI = 12
ERR_SCAN_ANGLE_OUT_OF_LIM = 13
ERR_SCAN_HIGH_VEL_LAR_42 = 14
ERR_SCAN_MODE_INVALID = 15
class MoveType(str, enum.Enum):
"""Enum class to switch between move types energy and angle for the Bragg positioner"""
ENERGY = "energy"
ANGLE = "angle"
class ScanControlMode(int, enum.Enum):
"""Enum class for the scan control mode of the Bragg positioner"""
SIMPLE = 0
ADVANCED = 1
@@ -0,0 +1,93 @@
"""Module for additional utils of the Mo1 Bragg Positioner"""
import numpy as np
from scipy.interpolate import BSpline
################ Define Constants ############
SAFETY_FACTOR = 0.025 # safety factor to limit acceleration -> NEVER SET TO ZERO !
N_SAMPLES = 41 # number of samples to generate -> Always choose uneven number,
# otherwise peak value will not be included
DEGREE_SPLINE = 3 # DEGREE_SPLINE of spline, 3 works good
TIME_COMPENSATE_SPLINE = 0.0062 # time to be compensated each spline in s
POSITION_COMPONSATION = 0.02 # angle to add at both limits, must be same values
# as used on ACS controller for simple scans
class Mo1UtilsSplineError(Exception):
"""Exception for spline computation"""
def compute_spline(
low_deg: float, high_deg: float, p_kink: float, e_kink_deg: float, scan_time: float
) -> tuple[float, float, float]:
"""Spline computation for the advanced scan mode
Args:
low_deg (float): Low angle value of the scan in deg
high_deg (float): High angle value of the scan in deg
scan_time (float): Time for a half oscillation in s
p_kink (float): Position of kink in %
e_kink_deg (float): Position of kink in degree
Returns:
tuple[float,float,float] : Position, Velocity and delta T arrays for the spline
"""
# increase motion range slightly so that xas trigger signals will occur at defined energy limits
low_deg = low_deg - POSITION_COMPONSATION
high_deg = high_deg + POSITION_COMPONSATION
if not (0 <= p_kink <= 100):
raise Mo1UtilsSplineError(
"Kink position not within range of [0..100%]" + f"for p_kink: {p_kink}"
)
if not (low_deg < e_kink_deg < high_deg):
raise Mo1UtilsSplineError(
"Kink energy not within selected energy range of scan,"
+ f"for e_kink_deg {e_kink_deg}, low_deg {low_deg} and"
+ f"high_deg {high_deg}."
)
tc1 = SAFETY_FACTOR / scan_time * TIME_COMPENSATE_SPLINE
t_kink = (scan_time - TIME_COMPENSATE_SPLINE - 2 * (SAFETY_FACTOR - tc1)) * p_kink / 100 + (
SAFETY_FACTOR - tc1
)
t_input = [
0,
SAFETY_FACTOR - tc1,
t_kink,
scan_time - TIME_COMPENSATE_SPLINE - SAFETY_FACTOR + tc1,
scan_time - TIME_COMPENSATE_SPLINE,
]
p_input = [0, 0, e_kink_deg - low_deg, high_deg - low_deg, high_deg - low_deg]
cv = np.stack((t_input, p_input)).T # spline coefficients
max_param = len(cv) - DEGREE_SPLINE
kv = np.clip(np.arange(len(cv) + DEGREE_SPLINE + 1) - DEGREE_SPLINE, 0, max_param) # knots
spl = BSpline(kv, cv, DEGREE_SPLINE) # get spline function
p = spl(np.linspace(0, max_param, N_SAMPLES))
v = spl(np.linspace(0, max_param, N_SAMPLES), 1)
a = spl(np.linspace(0, max_param, N_SAMPLES), 2)
j = spl(np.linspace(0, max_param, N_SAMPLES), 3)
tim, pos = p.T
pos = pos + low_deg
vel = v[:, 1] / v[:, 0]
acc = []
for item in a:
acc.append(0) if item[1] == 0 else acc.append(item[1] / item[0])
jerk = []
for item in j:
jerk.append(0) if item[1] == 0 else jerk.append(item[1] / item[0])
dt = np.zeros(len(tim))
for i in np.arange(len(tim)):
if i == 0:
dt[i] = 0
else:
dt[i] = 1000 * (tim[i] - tim[i - 1])
return pos, vel, dt
+706
View File
@@ -0,0 +1,706 @@
from __future__ import annotations
import time
from typing import TYPE_CHECKING, Literal, cast
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd import Device, DeviceStatus, EpicsSignal, EpicsSignalRO, Kind, StatusBase
from ophyd.status import SubscriptionStatus, WaitTimeoutError
from ophyd_devices import CompareStatus, ProgressSignal, TransitionStatus
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from ophyd_devices.sim.sim_signals import SetableSignal
from superxas_bec.devices.nidaq.nidaq_enums import (
EncoderFactors,
NIDAQCompression,
NidaqState,
ReadoutRange,
ScanRates,
ScanType,
)
if TYPE_CHECKING: # pragma: no cover
from bec_lib.devicemanager import ScanInfo
logger = bec_logger.logger
class NidaqError(Exception):
"""Nidaq specific error"""
class NidaqControl(Device):
"""Nidaq control class with all PVs"""
energy = Cpt(SetableSignal, value=0, kind=Kind.normal)
smpl_abs = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream sample absorption"
)
ref_abs = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream reference absorption"
)
cisum = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter sum"
)
ai0_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 0, MEAN"
)
ai1_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 1, MEAN"
)
ai2_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 2, MEAN"
)
ai3_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 3, MEAN"
)
ai4_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 4, MEAN"
)
ai5_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 5, MEAN"
)
ai6_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 6, MEAN"
)
ai7_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 7, MEAN"
)
di0_max = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream digital input 0, MAX")
di1_max = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream digital input 1, MAX")
di2_max = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream digital input 2, MAX")
di3_max = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream digital input 3, MAX")
di4_max = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream digital input 4, MAX")
ci0_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 0, MEAN"
)
ci1_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 1, MEAN"
)
ci2_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 2, MEAN"
)
ci3_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 3, MEAN"
)
ci4_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 4, MEAN"
)
ci5_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 5, MEAN"
)
ci6_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 6, MEAN"
)
ci7_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 7, MEAN"
)
ci8_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 8, MEAN"
)
ci9_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 9, MEAN"
)
### Readback PVs for EpicsEmitter ###
ai0 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-AI0",
kind=Kind.normal,
doc="EPICS analog input 0",
auto_monitor=True,
)
ai1 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-AI1",
kind=Kind.normal,
doc="EPICS analog input 1",
auto_monitor=True,
)
ai2 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-AI2",
kind=Kind.normal,
doc="EPICS analog input 2",
auto_monitor=True,
)
ai3 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-AI3",
kind=Kind.normal,
doc="EPICS analog input 3",
auto_monitor=True,
)
ai4 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-AI4",
kind=Kind.normal,
doc="EPICS analog input 4",
auto_monitor=True,
)
ai5 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-AI5",
kind=Kind.normal,
doc="EPICS analog input 5",
auto_monitor=True,
)
ai6 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-AI6",
kind=Kind.normal,
doc="EPICS analog input 6",
auto_monitor=True,
)
ai7 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-AI7",
kind=Kind.normal,
doc="EPICS analog input 7",
auto_monitor=True,
)
ci0 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-CI0",
kind=Kind.normal,
doc="EPICS counter input 0",
auto_monitor=True,
)
ci1 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-CI1",
kind=Kind.normal,
doc="EPICS counter input 1",
auto_monitor=True,
)
ci2 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-CI2",
kind=Kind.normal,
doc="EPICS counter input 2",
auto_monitor=True,
)
ci3 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-CI3",
kind=Kind.normal,
doc="EPICS counter input 3",
auto_monitor=True,
)
ci4 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-CI4",
kind=Kind.normal,
doc="EPICS counter input 4",
auto_monitor=True,
)
ci5 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-CI5",
kind=Kind.normal,
doc="EPICS counter input 5",
auto_monitor=True,
)
ci6 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-CI6",
kind=Kind.normal,
doc="EPICS counter input 6",
auto_monitor=True,
)
ci7 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-CI7",
kind=Kind.normal,
doc="EPICS counter input 7",
auto_monitor=True,
)
ci8 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-CI8",
kind=Kind.normal,
doc="EPICS counter input 8",
auto_monitor=True,
)
ci9 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-CI9",
kind=Kind.normal,
doc="EPICS counter input 9",
auto_monitor=True,
)
di0 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-DI0",
kind=Kind.normal,
doc="EPICS digital input 0",
auto_monitor=True,
)
di1 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-DI1",
kind=Kind.normal,
doc="EPICS digital input 1",
auto_monitor=True,
)
di2 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-DI2",
kind=Kind.normal,
doc="EPICS digital input 2",
auto_monitor=True,
)
di3 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-DI3",
kind=Kind.normal,
doc="EPICS digital input 3",
auto_monitor=True,
)
di4 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-DI4",
kind=Kind.normal,
doc="EPICS digital input 4",
auto_monitor=True,
)
enc_epics = Cpt(
EpicsSignalRO,
suffix="NIDAQ-ENC",
kind=Kind.normal,
doc="EPICS Encoder reading",
auto_monitor=True,
)
energy_epics = Cpt(
EpicsSignalRO,
suffix="NIDAQ-ENERGY",
kind=Kind.normal,
doc="EPICS Energy reading",
auto_monitor=True,
)
### Readback for BEC emitter ###
ai0_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 0, STD"
)
ai1_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 1, STD"
)
ai2_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 2, STD"
)
ai3_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 3, STD"
)
ai4_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 4, STD"
)
ai5_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 5, STD"
)
ai6_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 6, STD"
)
ai7_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 7, STD"
)
ci0_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 0. STD"
)
ci1_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 1. STD"
)
ci2_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 2. STD"
)
ci3_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 3. STD"
)
ci4_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 4. STD"
)
ci5_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 5. STD"
)
ci6_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 6. STD"
)
ci7_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 7. STD"
)
ci8_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 8. STD"
)
ci9_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 9. STD"
)
xas_timestamp = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream XAS timestamp")
# xrd_timestamp = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream XRD timestamp")
# xrd_angle = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream XRD angle")
# xrd_energy = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream XRD energy")
# xrd_ai0_mean = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream XRD ai0 mean")
# xrd_ai0_std_dev = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream XRD ai0 std dev")
enc = Cpt(SetableSignal, value=0, kind=Kind.normal)
rle = Cpt(SetableSignal, value=0, kind=Kind.normal)
### Control PVs ###
enable_compression = Cpt(EpicsSignal, suffix="NIDAQ-EnableRLE", kind=Kind.config, auto_monitor=True)
# enable_dead_time_correction = Cpt(EpicsSignal, suffix="NIDAQ-EnableDTC", kind=Kind.config, auto_monitor=True)
kickoff_call = Cpt(EpicsSignal, suffix="NIDAQ-Kickoff", kind=Kind.config)
stage_call = Cpt(EpicsSignal, suffix="NIDAQ-Stage", kind=Kind.config)
state = Cpt(EpicsSignal, suffix="NIDAQ-FSMState", kind=Kind.config, auto_monitor=True)
server_status = Cpt(EpicsSignalRO, suffix="NIDAQ-ServerStatus", kind=Kind.config)
compression_ratio = Cpt(EpicsSignalRO, suffix="NIDAQ-CompressionRatio", kind=Kind.config)
scan_type = Cpt(EpicsSignal, suffix="NIDAQ-ScanType", kind=Kind.config)
scan_type_string = Cpt(EpicsSignal, suffix="NIDAQ-ScanType", kind=Kind.config, string=True)
sampling_rate = Cpt(EpicsSignal, suffix="NIDAQ-SamplingRateRequested", kind=Kind.config, auto_monitor=True)
sampling_rate_string = Cpt(EpicsSignal, suffix="NIDAQ-SamplingRateRequested", kind=Kind.config, string=True, auto_monitor=True)
scan_duration = Cpt(EpicsSignal, suffix="NIDAQ-SamplingDuration", kind=Kind.config)
readout_range = Cpt(EpicsSignal, suffix="NIDAQ-ReadoutRange", kind=Kind.config, auto_monitor=True)
readout_range_string = Cpt(EpicsSignal, suffix="NIDAQ-ReadoutRange", kind=Kind.config, string=True, auto_monitor=True)
encoder_factor = Cpt(EpicsSignal, suffix="NIDAQ-EncoderFactor", kind=Kind.config, auto_monitor=True)
encoder_factor_string = Cpt(EpicsSignal, suffix="NIDAQ-EncoderFactor", kind=Kind.config, string=True, auto_monitor=True)
stop_call = Cpt(EpicsSignal, suffix="NIDAQ-Stop", kind=Kind.config)
power = Cpt(EpicsSignal, suffix="NIDAQ-Power", kind=Kind.config)
heartbeat = Cpt(EpicsSignal, suffix="NIDAQ-Heartbeat", kind=Kind.config, auto_monitor=True)
time_left = Cpt(EpicsSignalRO, suffix="NIDAQ-TimeLeft", kind=Kind.config, auto_monitor=True)
ai_chans = Cpt(EpicsSignal, suffix="NIDAQ-AIChans", kind=Kind.config)
ci_chans = Cpt(EpicsSignal, suffix="NIDAQ-CIChans", kind=Kind.config)
di_chans = Cpt(EpicsSignal, suffix="NIDAQ-DIChans", kind=Kind.config)
add_chans = Cpt(EpicsSignal, suffix="NIDAQ-AddChans", kind=Kind.config, auto_monitor=True)
smpl_abs_ln = Cpt(EpicsSignal, suffix="NIDAQ-smpl_abs_ln", kind=Kind.config, auto_monitor=True)
ref_abs_ln = Cpt(EpicsSignal, suffix="NIDAQ-ref_abs_ln", kind=Kind.config, auto_monitor=True)
smpl_abs_no = Cpt(EpicsSignal, suffix="NIDAQ-smpl_abs_no", kind=Kind.config, auto_monitor=True)
smpl_abs_no_string = Cpt(EpicsSignal, suffix="NIDAQ-smpl_abs_no", kind=Kind.config, string=True, auto_monitor=True)
smpl_abs_de = Cpt(EpicsSignal, suffix="NIDAQ-smpl_abs_de", kind=Kind.config, auto_monitor=True)
smpl_abs_de_string = Cpt(EpicsSignal, suffix="NIDAQ-smpl_abs_de", kind=Kind.config, string=True, auto_monitor=True)
ref_abs_no = Cpt(EpicsSignal, suffix="NIDAQ-ref_abs_no", kind=Kind.config, auto_monitor=True)
ref_abs_no_string = Cpt(EpicsSignal, suffix="NIDAQ-ref_abs_no", kind=Kind.config, string=True, auto_monitor=True)
ref_abs_de = Cpt(EpicsSignal, suffix="NIDAQ-ref_abs_de", kind=Kind.config, auto_monitor=True)
ref_abs_de_string = Cpt(EpicsSignal, suffix="NIDAQ-ref_abs_de", kind=Kind.config, string=True, auto_monitor=True)
class Nidaq(PSIDeviceBase, NidaqControl):
"""NIDAQ ophyd wrapper around the NIDAQ backend currently running at x10da-nidaq-01
Args:
prefix (str) : Prefix to the NIDAQ soft ioc, currently X10DA-CPCL-SCANSERVER:
name (str) : Name of the device
scan_info (ScanInfo) : ScanInfo object passed by BEC's devicemanager.
"""
progress_signal = Cpt(ProgressSignal, name="progress_signal")
USER_ACCESS = ["set_config"]
def __init__(self, prefix: str = "", *, name: str, scan_info: ScanInfo = None, **kwargs):
super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs)
self.scan_info: ScanInfo
self.timeout_wait_for_signal = 5 # put 5s firsts
self._timeout_wait_for_pv = 5 # 5s timeout for pv calls. editted due to timeout issues persisting
self.valid_scan_names = [
"xas_simple_scan",
"xas_simple_scan_with_xrd",
"xas_advanced_scan",
"xas_advanced_scan_with_xrd",
"nidaq_continuous_scan",
]
########################################
# Beamline Methods #
########################################
def _check_if_scan_name_is_valid(self) -> bool:
"""Check if the scan is within the list of scans for which the backend is working"""
scan_name = self.scan_info.msg.scan_name
if scan_name in self.valid_scan_names:
return True
return False
def set_config(
self,
sampling_rate: Literal[
100000, 500000, 1000000, 2000000, 4000000, 5000000, 10000000, 14286000
],
ai: list,
ci: list,
di: list,
scan_type: Literal["continuous", "triggered"] = "triggered",
scan_duration: float = 0,
readout_range: Literal[1, 2, 5, 10] = 10,
encoder_type: Literal["X_1", "X_2", "X_4"] = "X_4",
enable_compression: bool = True,
) -> None:
"""Method to configure the NIDAQ
Args:
sampling_rate(Literal[100000, 500000, 1000000, 2000000, 4000000, 5000000,
10000000, 14286000]): Sampling rate in Hz
ai(list): List of analog input channel numbers to add, i.e. [0, 1, 2] for
input 0, 1 and 2
ci(list): List of counter input channel numbers to add, i.e. [0, 1, 2] for
input 0, 1 and 2
di(list): List of digital input channel numbers to add, i.e. [0, 1, 2] for
input 0, 1 and 2
scan_type(Literal['continuous', 'triggered']): Triggered to use with monochromator,
otherwise continuous, default 'triggered'
scan_duration(float): Scan duration in seconds, use 0 for infinite scan, default 0
readout_range(Literal[1, 2, 5, 10]): Readout range in +- Volts, default +-10V
encoder_type(Literal['X_1', 'X_2', 'X_4']): Encoder readout type, default 'X_4'
enable_compression(bool): Enable or disable compression of data, default True
"""
if sampling_rate == 100000:
self.sampling_rate.put(ScanRates.HUNDRED_KHZ)
elif sampling_rate == 500000:
self.sampling_rate.put(ScanRates.FIVE_HUNDRED_KHZ)
elif sampling_rate == 1000000:
self.sampling_rate.put(ScanRates.ONE_MHZ)
elif sampling_rate == 2000000:
self.sampling_rate.put(ScanRates.TWO_MHZ)
elif sampling_rate == 4000000:
self.sampling_rate.put(ScanRates.FOUR_MHZ)
elif sampling_rate == 5000000:
self.sampling_rate.put(ScanRates.FIVE_MHZ)
elif sampling_rate == 10000000:
self.sampling_rate.put(ScanRates.TEN_MHZ)
elif sampling_rate == 14286000:
self.sampling_rate.put(ScanRates.FOURTEEN_THREE_MHZ)
ai_chans = 0
if isinstance(ai, list):
for ch in ai:
if isinstance(ch, int):
if ch >= 0 and ch <= 7:
ai_chans = ai_chans | (1 << ch)
self.ai_chans.put(ai_chans)
ci_chans = 0
if isinstance(ci, list):
for ch in ci:
if isinstance(ch, int):
if ch >= 0 and ch <= 7:
ci_chans = ci_chans | (1 << ch)
self.ci_chans.put(ci_chans)
di_chans = 0
if isinstance(di, list):
for ch in di:
if isinstance(ch, int):
if ch >= 0 and ch <= 4:
di_chans = di_chans | (1 << ch)
self.di_chans.put(di_chans)
if scan_type in "continuous":
self.scan_type.put(ScanType.CONTINUOUS)
elif scan_type in "triggered":
self.scan_type.put(ScanType.TRIGGERED)
if scan_duration >= 0:
self.scan_duration.put(scan_duration)
if readout_range == 1:
self.readout_range.put(ReadoutRange.ONE_V)
elif readout_range == 2:
self.readout_range.put(ReadoutRange.TWO_V)
elif readout_range == 5:
self.readout_range.put(ReadoutRange.FIVE_V)
elif readout_range == 10:
self.readout_range.put(ReadoutRange.TEN_V)
if encoder_type in "1/16":
self.encoder_factor.put(EncoderFactors.X1_16)
elif encoder_type in "1/8":
self.encoder_factor.put(EncoderFactors.X1_8)
elif encoder_type in "1/4":
self.encoder_factor.put(EncoderFactors.X1_4)
elif encoder_type in "1/2":
self.encoder_factor.put(EncoderFactors.X1_2)
elif encoder_type in "1":
self.encoder_factor.put(EncoderFactors.X1)
elif encoder_type in "2":
self.encoder_factor.put(EncoderFactors.X2)
elif encoder_type in "4":
self.encoder_factor.put(EncoderFactors.X4)
if enable_compression is True:
self.enable_compression.put(NIDAQCompression.ON)
elif enable_compression is False:
self.enable_compression.put(NIDAQCompression.OFF)
########################################
# Beamline Specific Implementations #
########################################
def on_init(self) -> None:
"""
Called when the device is initialized.
No signals are connected at this point. If you like to
set default values on signals, please use on_connected instead.
"""
def on_connected(self) -> None:
"""
Called after the device is connected and its signals are connected.
Default values for signals should be set here.
"""
status = TransitionStatus(self.heartbeat, transitions=[0, 1], strict=False)
self.cancel_on_stop(status)
try:
status.wait(timeout=self.timeout_wait_for_signal) # Raises if timeout is reached
except WaitTimeoutError:
logger.warning(f"Device {self.name} was not alive, trying to put power on")
status = TransitionStatus(self.heartbeat, transitions=[0, 1], strict=False)
self.cancel_on_stop(status)
self.power.put(1)
status.wait(timeout=self.timeout_wait_for_signal)
status = CompareStatus(self.state, NidaqState.STANDBY)
self.cancel_on_stop(status)
status.wait(timeout=self.timeout_wait_for_signal)
self.scan_duration.set(0).wait(timeout=self._timeout_wait_for_pv)
self.time_left.subscribe(self._progress_update, run=False)
def on_stage(self) -> DeviceStatus | StatusBase | None:
"""
Called while staging the device.
Information about the upcoming scan can be accessed from the scan_info (self.scan_info.msg) object.
If the upcoming scan is not in the list of valid scans, return immediately.
"""
if not self._check_if_scan_name_is_valid():
return None
if self.state.get() != NidaqState.STANDBY:
status = CompareStatus(self.state, NidaqState.STANDBY)
self.cancel_on_stop(status)
self.on_stop()
status.wait(timeout=self.timeout_wait_for_signal)
# If scan is not part of the valid_scan_names,
if self.scan_info.msg.scan_name != "nidaq_continuous_scan":
self.scan_type.set(ScanType.TRIGGERED).wait(timeout=self._timeout_wait_for_pv)
self.scan_duration.set(0).wait(timeout=self._timeout_wait_for_pv)
self.enable_compression.set(1).wait(timeout=self._timeout_wait_for_pv)
else:
self.scan_type.set(ScanType.CONTINUOUS).wait(timeout=self._timeout_wait_for_pv)
self.scan_duration.set(self.scan_info.msg.scan_parameters["scan_duration"]).wait(
timeout=self._timeout_wait_for_pv
)
self.enable_compression.set(self.scan_info.msg.scan_parameters["compression"]).wait(
timeout=self._timeout_wait_for_pv
)
# Stage call to IOC
status = CompareStatus(self.state, NidaqState.STAGE)
self.cancel_on_stop(status)
# TODO 11.11.25/HS64
# Switched from set to put in the hope to get rid of the rare event where nidaq is stopped at the start of a scan
# Problems consistently persisting, testing changing back to set, unconvinced this is the actual cause 14.11.25/AHC
# self.stage_call.set(1).wait(timeout=self._timeout_wait_for_pv)
self.stage_call.put(1)
status.wait(timeout=self.timeout_wait_for_signal)
if self.scan_info.msg.scan_name != "nidaq_continuous_scan":
status = self.on_kickoff()
self.cancel_on_stop(status)
status.wait(timeout=self._timeout_wait_for_pv)
logger.info(f"Device {self.name} was staged: {NidaqState(self.state.get())}")
def on_kickoff(self) -> DeviceStatus | StatusBase:
"""Kickoff the Nidaq"""
status = self.kickoff_call.set(1)
self.cancel_on_stop(status)
return status
def on_unstage(self) -> DeviceStatus | StatusBase | None:
"""Called while unstaging the device. Check that the Nidaq goes into Standby"""
status = CompareStatus(self.state, NidaqState.STANDBY)
self.cancel_on_stop(status)
status.wait(timeout=self.timeout_wait_for_signal)
status = self.enable_compression.set(1)
self.cancel_on_stop(status)
status.wait(self._timeout_wait_for_pv)
logger.info(f"Device {self.name} was unstaged: {NidaqState(self.state.get())}")
def on_pre_scan(self) -> DeviceStatus | StatusBase | None:
"""
Called right before the scan starts on all devices automatically.
Here we ensure that the NIDAQ master task is running
before the motor starts its oscillation. This is needed for being properly homed.
The NIDAQ should go into Acquiring mode.
"""
if not self._check_if_scan_name_is_valid():
return None
if self.scan_info.msg.scan_name == "nidaq_continuous_scan":
logger.info(f"Device {self.name} ready to be kicked off for nidaq_continuous_scan")
return None
status = CompareStatus(self.state, NidaqState.KICKOFF)
self.cancel_on_stop(status)
status.wait(timeout=self._timeout_wait_for_pv)
logger.info(
f"Device {self.name} ready to take data after pre_scan: {NidaqState(self.state.get())}"
)
def on_trigger(self) -> DeviceStatus | StatusBase | None:
"""Called when the device is triggered."""
def on_complete(self) -> DeviceStatus | StatusBase | None:
"""
Called to inquire if a device has completed a scans.
For the NIDAQ we use this method to stop the backend since it
would not stop by itself in its current implementation since the number of points are not predefined.
"""
if not self._check_if_scan_name_is_valid():
return None
status = CompareStatus(self.state, NidaqState.STANDBY)
self.cancel_on_stop(status)
if self.scan_info.msg.scan_name != "nidaq_continuous_scan":
self.on_stop()
return status
def _progress_update(self, value, **kwargs) -> None:
"""Callback method to update the scan progress, runs a callback
to SUB_PROGRESS subscribers, i.e. BEC.
Args:
value (int) : current progress value
"""
scan_duration = self.scan_info.msg.scan_parameters.get("scan_duration", None)
if not isinstance(scan_duration, (int, float)):
return
value = scan_duration - value
max_value = scan_duration
self.progress_signal.put(value=value, max_value=max_value, done=bool(max_value == value))
def on_stop(self) -> None:
"""Called when the device is stopped."""
self.stop_call.put(1)
+60
View File
@@ -0,0 +1,60 @@
import enum
class NIDAQCompression(str, enum.Enum):
"""Options for Compression"""
OFF = 0
ON = 1
class ScanType(int, enum.Enum):
"""Triggering options of the backend"""
TRIGGERED = 0
CONTINUOUS = 1
class NidaqState(int, enum.Enum):
"""Possible States of the NIDAQ backend"""
DISABLED = 0
STANDBY = 1
STAGE = 2
KICKOFF = 3
ACQUIRE = 4
UNSTAGE = 5
class ScanRates(int, enum.Enum):
"""Sampling Rate options for the backend, in kHZ and MHz"""
HUNDRED_KHZ = 0
FIVE_HUNDRED_KHZ = 1
ONE_MHZ = 2
TWO_MHZ = 3
FOUR_MHZ = 4
FIVE_MHZ = 5
TEN_MHZ = 6
FOURTEEN_THREE_MHZ = 7
class ReadoutRange(int, enum.Enum):
"""ReadoutRange in +-V"""
ONE_V = 0
TWO_V = 1
FIVE_V = 2
TEN_V = 3
class EncoderFactors(int, enum.Enum):
"""Encoder Factors"""
X1_16 = 0
X1_8 = 1
X1_4 = 2
X1_2 = 3
X1 = 4
X2 = 5
X4 = 6
+69 -28
View File
@@ -19,11 +19,14 @@ from bec_lib.file_utils import get_full_path
from bec_lib.logger import bec_logger
from ophyd import ADBase
from ophyd import Component as Cpt
from ophyd import EpicsSignalRO, EpicsSignalWithRBV
from ophyd import Signal
from ophyd_devices import (
AsyncSignal,
CompareStatus,
DeviceStatus,
EpicsSignalRO,
EpicsSignalWithRBV,
ExceptionStatus,
FileEventSignal,
PreviewSignal,
StatusBase,
@@ -134,6 +137,13 @@ class FILEWRITEMODE(int, enum.Enum):
STREAM = 2
class IMAGEMODE(int, enum.Enum):
"""Values for ImageMode PV"""
SINGLE = 0
MULTIPLE = 1
def load_pixel_map_from_json(file_path: str) -> PixelMap:
"""Load a pixel map from a JSON file.
@@ -198,6 +208,7 @@ class Timepix(PSIDeviceBase, TimePixControl):
"set_pixel_map",
"set_pixel_map_from_json_file",
"set_enable_xes",
"set_enable_image_writing",
]
xes_data = Cpt(
@@ -511,6 +522,15 @@ class Timepix(PSIDeviceBase, TimePixControl):
"""
self.enable_xes = enable
def set_enable_image_writing(self, enable: bool) -> None:
"""
Enable or disable image writing to file through the HDF5 plugin.
Args:
enable (bool): Whether to enable image writing.
"""
self.hdf.enable.set(1 if enable else 0).wait(timeout=self._pv_timeout)
@property
def enable_xes(self) -> bool:
"""Get whether XES data acquisition is enabled."""
@@ -610,6 +630,13 @@ class Timepix(PSIDeviceBase, TimePixControl):
self.cam.tdc2_edge.set(TDCEdge.RISING).wait(timeout=self._pv_timeout)
self.cam.tdc2_output.set(TDCOuput.ALL_CHANNELS).wait(timeout=self._pv_timeout)
def wait_for_connection(
self, all_signals: bool = False, timeout: float | None = None, **kwargs
):
super().wait_for_connection(all_signals, timeout)
# Prepare backend for TimePixFly
self.backend.on_connected()
def on_connected(self) -> None:
"""
Called after the device is connected and its signals are connected.
@@ -624,6 +651,8 @@ class Timepix(PSIDeviceBase, TimePixControl):
self.cam.exposure_mode.set(EXPOSUREMODE.TIMED).wait(timeout=self._pv_timeout)
# Reset array counter on connect
self.cam.array_counter.set(0).wait(timeout=self._pv_timeout)
# Set image mode to multiple
self.cam.image_mode.set(IMAGEMODE.MULTIPLE).wait(timeout=self._pv_timeout)
# ------------------
# Prepare file writing through AD HDF5 plugin
@@ -639,8 +668,6 @@ class Timepix(PSIDeviceBase, TimePixControl):
# Prepare TimePixFly backend
# -----------------
# Prepare backend for TimePixFly
self.backend.on_connected()
# Register the callback for processing data received by the backend
self.backend.add_callback(self.msg_buffer_callback)
self._poll_thread.start()
@@ -693,18 +720,18 @@ class Timepix(PSIDeviceBase, TimePixControl):
file_path = "/".join(self._full_path.split("/")[:-1])
file_name = self._full_path.split("/")[-1]
self.cam.array_callbacks.set(1).wait(5) # Enable array callbacks
# self.hdf.enable.set(1).wait(5) # Enable HDF5 plugin
self.hdf.file_path.set(file_path).wait(5)
self.hdf.file_name.set(file_name).wait(5)
# Setup file writing for the total expected number of images
self.hdf.num_capture.set(self._n_images).wait(5)
self.hdf.capture.put(1)
self.file_event.put(
file_path=self._full_path,
done=False,
successful=False,
hinted_h5_entries={"data": "/entry/data/data"},
)
if self.hdf.enable.get() == "Enable":
self.hdf.capture.put(1)
self.file_event.put(
file_path=self._full_path,
done=False,
successful=False,
hinted_h5_entries={"data": "/entry/data/data"},
)
# -------------------------
# XES specific staging
@@ -740,11 +767,15 @@ class Timepix(PSIDeviceBase, TimePixControl):
status_camera = CompareStatus(
self.cam.acquire_busy, ACQUIRESTATUS.DONE, timeout=self._pv_timeout
)
status_writer = CompareStatus(
self.hdf.capture, ACQUIRESTATUS.ACQUIRING, timeout=self._pv_timeout
)
status = status_camera & status_writer
status_writer = None
if self.hdf.enable.get() == "Enable":
status_writer = CompareStatus(
self.hdf.capture, ACQUIRESTATUS.ACQUIRING, timeout=self._pv_timeout
)
if status_writer:
status = status_camera & status_writer
else:
status = status_camera
self.cancel_on_stop(status)
return status
@@ -799,12 +830,15 @@ class Timepix(PSIDeviceBase, TimePixControl):
def on_complete(self) -> DeviceStatus | StatusBase | None:
"""Called to inquire if a device has completed a scans."""
# Status Camera
status_camera = CompareStatus(self.cam.acquire_busy, ACQUIRESTATUS.DONE)
return_status = CompareStatus(self.cam.acquire_busy, ACQUIRESTATUS.DONE)
# Status Writer
st1 = CompareStatus(self.hdf.capture, ACQUIRESTATUS.DONE)
st2 = CompareStatus(self.hdf.write_file, ACQUIRESTATUS.DONE)
status_written_images = CompareStatus(self.hdf.num_captured, self._n_images)
status_writer = st1 & st2 & status_written_images
status_writer = None
if self.hdf.enable.get() == "Enable":
st2 = CompareStatus(self.hdf.write_file, ACQUIRESTATUS.DONE)
st3 = ExceptionStatus(self.hdf.write_status, 0, operation="!=")
status_written_images = CompareStatus(self.hdf.num_captured, self._n_images)
status_writer = st1 & st2 & status_written_images & st3
# Status Backend
status_backend = None
@@ -813,9 +847,9 @@ class Timepix(PSIDeviceBase, TimePixControl):
status_backend = self.backend.on_complete(status=status_backend)
# Combine the statuses
if status_backend is not None:
return_status = status_backend & status_camera & status_writer
else:
return_status = status_camera & status_writer
return_status = status_backend & return_status
if status_writer is not None:
return_status = return_status & status_writer
return_status.add_callback(self._complete_callback)
self.cancel_on_stop(return_status)
@@ -823,6 +857,10 @@ class Timepix(PSIDeviceBase, TimePixControl):
def _complete_callback(self, status: CompareStatus) -> None:
"""Callback for when the device completes a scan."""
if (
self.hdf.enable.get() == "Enable"
): # TODO: Not sure if we should support disabled file writing.
return
if status.success:
self.file_event.put(
file_path=self._full_path, # pylint: disable:protected-access
@@ -849,11 +887,14 @@ class Timepix(PSIDeviceBase, TimePixControl):
def on_destroy(self):
"""Cleanup method to stop the device and clean up resources."""
self.cam.acquire.put(0)
self.hdf.capture.put(0)
self._poll_thread_kill_event.set()
self.backend.on_stop()
self.backend.on_destroy()
try:
self.cam.acquire.put(0)
self.hdf.capture.put(0)
self._poll_thread_kill_event.set()
self.backend.on_stop()
self.backend.on_destroy()
except Exception:
logger.warning(f"Failed to destroy {self.name}.")
# pylint: disable=protected-access
@@ -12,7 +12,6 @@ hooks for all the relevant ophyd interface, 'on_stage',
from __future__ import annotations
import json
import signal
import socket
import threading
import time
@@ -81,14 +80,14 @@ class TimepixFlyBackend:
###### Hooks for the PSIDeviceBase interface ######
###################################################
def on_connected(self):
def on_connected(self, timeout: float = 10):
"""Called if it is ensured that the device is connected."""
time_started = time.time()
logger.info("Connecting to Timepix Fly backend...")
try:
self.timepix_fly_client.on_connected()
self.timepix_fly_client.on_connected(timeout=timeout / 2)
status = self.start_data_server()
status.wait(timeout=5)
status.wait(timeout=timeout / 2)
except Exception: # pylint: disable=broad-except
content = traceback.format_exc()
logger.error(f"Error starting data server: {content}")
@@ -239,6 +238,7 @@ class TimepixFlyBackend:
def on_destroy(self):
"""Hook for on_destroy logic."""
time_started = time.time()
self.timepix_fly_client.shutdown()
self._data_thread_shutdown_event.set()
if self._data_thread is not None and self._data_thread.is_alive():
@@ -255,6 +255,9 @@ class TimepixFlyBackend:
except Exception:
content = traceback.format_exc()
logger.error(f"Error closing socket server: {content}")
logger.info(
f"Timepix Fly backend destroyed and resources cleaned up after {time.time() - time_started:.3f} seconds."
)
def on_stop(self):
"""Hook for on_stop logic."""
@@ -88,7 +88,7 @@ class TimepixFlyClient:
### Utility Methods ###
#############################
def on_connected(self) -> None:
def on_connected(self, timeout: float = 5) -> None:
"""
Called when the client is connected to the TimePix server.
This method can be overridden to perform actions when the client connects.
@@ -96,7 +96,7 @@ class TimepixFlyClient:
try:
self.stop_running_collection()
self.connect()
self.wait_for_connection(timeout=5)
self.wait_for_connection(timeout=timeout)
except Exception:
content = traceback.format_exc()
@@ -270,16 +270,18 @@ class TimepixFlyClient:
continue
if status in success:
dev_status.set_finished()
logger.debug(f"Status callback finished in succes: {status.value}")
logger.debug(f"Status callback finished in success: {status.value}")
self._status_callbacks.pop(cb_id)
elif status in error:
try:
last_error = self.last_error()
raise TimePixStatusError(
f"TimePixFly Backend state '{status.value}' is in list of specified errors {error}. Last error message: {last_error.message}"
f"TimePixFly state '{status.value}': {last_error.message}"
)
except Exception as e:
logger.error(f"Error in status callback from TimepixFly Backend: {e}")
logger.error(
f"Error in status callback for '{status.value}' from TimepixFly backend: {e}"
)
dev_status.set_exception(e)
self._status_callbacks.pop(cb_id)
# Reset the _started flag if the status is in CONFIG.
+1
View File
@@ -0,0 +1 @@
from .superxas_nexus_structure import SuperXASNexusStructure
@@ -0,0 +1,308 @@
from bec_server.file_writer.default_writer import DefaultFormat
import superxas_bec.bec_widgets.widgets.x10da_parameters as bl
class SuperXASNexusStructure(DefaultFormat):
"""Nexus Structure for SuperXAS"""
def format(self) -> None:
"""Specify the file format for the file writer."""
entry = self.storage.create_group(name="entry")
entry.attrs["NX_class"] = "NXentry"
instrument = entry.create_group(name="instrument")
instrument.attrs["NX_class"] = "NXinstrument"
##################
## source specific information
###################
source = instrument.create_group(name="source")
source.attrs["NX_class"] = "NXsource"
beamline_name = source.create_dataset(name="beamline_name", data="SuperXAS")
beamline_name.attrs["NX_class"] = "NX_CHAR"
facility_name = source.create_dataset(name="facility_name", data="Swiss Light Source")
facility_name.attrs["NX_class"] = "NX_CHAR"
probe = source.create_dataset(name="probe", data="X-ray")
probe.attrs["NX_class"] = "NX_CHAR"
if "curr" in self.device_manager.devices:
ring_current = source.create_soft_link(
name="ring_current",
target="/entry/collection/devices/curr/curr/value",
)
ring_current.attrs["NX_class"] = "NX_FLOAT"
ring_current.attrs["units"] = "mA"
###################
## mo1_bragg specific information
###################
## Logic if device exist
if "mo1_bragg" in self.device_manager.devices:
monochromator = instrument.create_group(name="monochromator")
monochromator.attrs["NX_class"] = "NXmonochromator"
crystal = monochromator.create_group(name="crystal")
crystal.attrs["NX_class"] = "NXcrystal"
# Create a dataset
chemical_formular = crystal.create_dataset(name="chemical_formular", data="Si")
chemical_formular.attrs["NX_class"] = "NX_CHAR"
reflection = crystal.create_soft_link(
name="reflection",
target="/entry/collection/devices/mo1_bragg/mo1_bragg_crystal_current_xtal_string/value",
)
reflection.attrs["NX_class"] = "NX_CHAR"
# Create a softlink
d_spacing = crystal.create_soft_link(
name="d_spacing",
target="/entry/collection/devices/mo1_bragg/mo1_bragg_crystal_current_d_spacing/value",
)
d_spacing.attrs["NX_class"] = "NX_FLOAT"
d_spacing.attrs["units"] = "angstrom"
bragg_offset = crystal.create_soft_link(
name="bragg_offset",
target="/entry/collection/devices/mo1_bragg/mo1_bragg_crystal_current_bragg_off/value",
)
bragg_offset.attrs["NX_class"] = "NX_FLOAT"
bragg_offset.attrs["units"] = "degree"
###################
### cm mirror specific information
####################
collimating_mirror = instrument.create_group(name="collimating_mirror")
collimating_mirror.attrs["NX_class"] = "NXmirror"
cm_substrate_material = collimating_mirror.create_dataset(
name="substrate_material", data="Si"
)
cm_substrate_material.attrs["NX_class"] = "NX_CHAR"
#previous error due to space in name field
if "cm_bnd" in self.device_manager.devices:
cm_bending = collimating_mirror.create_soft_link(
name="sagittal_radius_bender_motor",
target="/entry/collection/devices/cm_bnd/cm_bnd/value",
)
cm_bending.attrs["NX_class"] = "NX_FLOAT"
cm_bending.attrs["units"] = "steps"
if "cm_rotx" in self.device_manager.devices:
cm_incidence_angle = collimating_mirror.create_soft_link(
name="incidence_angle", target="/entry/collection/devices/cm_rotx/cm_rotx/value"
)
cm_incidence_angle.attrs["NX_class"] = "NX_FLOAT"
cm_incidence_angle.attrs["units"] = "mrad"
if "cm_roty" in self.device_manager.devices:
cm_yaw_angle = collimating_mirror.create_soft_link(
name="yaw_angle", target="/entry/collection/devices/cm_roty/cm_roty/value"
)
cm_yaw_angle.attrs["NX_class"] = "NX_FLOAT"
cm_yaw_angle.attrs["units"] = "mrad"
if "cm_rotz" in self.device_manager.devices:
cm_roll_angle = collimating_mirror.create_soft_link(
name="roll_angle", target="/entry/collection/devices/cm_rotz/cm_rotz/value"
)
cm_roll_angle.attrs["NX_class"] = "NX_FLOAT"
cm_roll_angle.attrs["units"] = "mrad"
if 'cm_trx' in self.device_manager.devices:
cm_trx = - self.device_manager.devices.cm_trx.read(cached=True).get('cm_trx').get('value')
stripe = 'Unknown'
for name, low, high in zip(bl.cm.surface, bl.cm.limOptX[0], bl.cm.limOptX[1]):
if low <= cm_trx <= high:
stripe = name
cm_stripe = collimating_mirror.create_dataset(
name="stripe", data=stripe
)
cm_stripe.attrs["NX_class"] = "NX_CHAR"
###################
### fm mirror specific information
####################
focusing_mirror = instrument.create_group(name="focusing_mirror")
focusing_mirror.attrs["NX_class"] = "NXmirror"
fm_substrate_material = focusing_mirror.create_dataset(
name="substrate_material", data="Si"
)
fm_substrate_material.attrs["NX_class"] = "NX_CHAR"
if "fm_bnd" in self.device_manager.devices:
fm_bending = focusing_mirror.create_soft_link(
name="sagittal_radius_bender_motor",
target="/entry/collection/devices/fm_bnd/fm_bnd/value",
)
fm_bending.attrs["NX_class"] = "NX_FLOAT"
fm_bending.attrs["units"] = "steps"
if "fm_rotx" in self.device_manager.devices:
fm_incidence_angle = focusing_mirror.create_soft_link(
name="incidence_angle", target="/entry/collection/devices/fm_rotx/fm_rotx/value"
)
fm_incidence_angle.attrs["NX_class"] = "NX_FLOAT"
fm_incidence_angle.attrs["units"] = "mrad"
if "fm_roty" in self.device_manager.devices:
fm_yaw_angle = focusing_mirror.create_soft_link(
name="yaw_angle", target="/entry/collection/devices/fm_roty/fm_roty/value"
)
fm_yaw_angle.attrs["NX_class"] = "NX_FLOAT"
fm_yaw_angle.attrs["units"] = "mrad"
if "fm_rotz" in self.device_manager.devices:
fm_roll_angle = focusing_mirror.create_soft_link(
name="roll_angle", target="/entry/collection/devices/fm_rotz/fm_rotz/value"
)
fm_roll_angle.attrs["NX_class"] = "NX_FLOAT"
fm_roll_angle.attrs["units"] = "mrad"
if 'fm_trx' in self.device_manager.devices:
fm_trx = - self.device_manager.devices.fm_trx.read(cached=True).get('fm_trx').get('value')
stripe = 'Unknown'
for name, low, high in zip(bl.fm.surfaceToroid, bl.fm.limOptXToroid[1], bl.fm.limOptXToroid[0]):
if low <= fm_trx <= high:
stripe = name + ' (toroid)'
fm_stripe = focusing_mirror.create_dataset(
name="stripe", data=stripe
)
fm_stripe.attrs["NX_class"] = "NX_CHAR"
###################
## nidaq specific information
###################
## Logic if device exist
if "nidaq" in self.device_manager.devices:
#ai_chans_bits = self.device_manager.devices.nidaq.ai_chans.read(cached=True).get("nidaq_ai_chans").get("value")
ai_chans_bits = self.configuration.get("nidaq", {}).get("nidaq_ai_chans", {}).get("value")
ci_chans_bits = self.configuration.get("nidaq", {}).get("nidaq_ci_chans", {}).get("value")
#add_chans_bits = self.device_manager.devices.nidaq.add_chans.read(cached=True).get("nidaq_add_chans").get("value")
add_chans_bits = self.configuration.get("nidaq", {}).get("nidaq_add_chans", {}).get("value")
measurement_mode = entry.create_group(name="mode")
measurement_mode.attrs["NX_class"] = "NX_CHAR"
if (int(ci_chans_bits) & 0x7F) != 0:
# Create a dataset
rayspec_sdd_active = measurement_mode.create_group(name="Multi_Element_Partial_Fluorescence_Yield")
me_sdd = rayspec_sdd_active.create_dataset(name="Detector", data="Rayspec 7 element Silicon Drift Detector")
me_sdd.attrs["NX_class"] = "NX_CHAR"
if (int(ci_chans_bits) & (1<<8)) != 0:
# Create a dataset
ketek_sdd_active = measurement_mode.create_group(name="Single_Element_Partial_Fluorescence_Yield")
se_sdd = ketek_sdd_active.create_dataset(name="Detector", data="Ketex mini single element Silicon Drift Detector")
se_sdd.attrs["NX_class"] = "NX_CHAR"
if ((int(ai_chans_bits) & (1<<6)) != 0):
# Create a dataset
pips_active = measurement_mode.create_group(name="Total_Flourescence_Yield")
tfy = pips_active.create_dataset(name="Detector", data="Mirion Technologies Partially Depeleted PIPS Detector")
tfy.attrs["NX_class"] = "NX_CHAR"
if ((int(ai_chans_bits) & (1<<0)) != 0) & ((int(ai_chans_bits) & (1<<2)) != 0):
# Create a dataset
ai0ai2_active = measurement_mode.create_group(name="Sample_Transmission")
sam_trans = ai0ai2_active.create_dataset(name="Detector", data="Ionitec 15 cm gas filled Ionisation Chambers")
sam_trans.attrs["NX_class"] = "NX_CHAR"
if ((int(ai_chans_bits) & (1<<2)) != 0) & ((int(ai_chans_bits) & (1<<4)) != 0):
# Create a dataset
ai2ai4_active = measurement_mode.create_group(name="Reference_Transmission")
ref_trans = ai2ai4_active.create_dataset(name="Detector", data="Ionitec 15 cm gas filled Ionisation Chambers")
ref_trans.attrs["NX_class"] = "NX_CHAR"
main_data = entry.create_group(name="data")
main_data.attrs["NX_class"] = "NXdata"
##################
## energy, test whether the signal exists. how to check from config?
###################
energy = main_data.create_group(name="energy")
energy.attrs["NX_class"] = "NXdata"
energy.attrs["units"] = "eV"
main_data.create_soft_link(name="energy", target="/entry/collection/readout_groups/async/nidaq/nidaq_energy/value")
##################
## i0
###################
if (int(ai_chans_bits) & (1<<0)) !=0:
i0 = main_data.create_group(name="i0")
i0.attrs["NX_class"] = "NXdata"
i0.attrs["units"] = "V"
main_data.create_soft_link(name="i0", target="/entry/collection/readout_groups/async/nidaq/nidaq_ai0_mean/value")
##################
## i1
###################
if (int(ai_chans_bits) & (1<<2)) !=0:
i1 = main_data.create_group(name="i1")
i1.attrs["NX_class"] = "NXdata"
i1.attrs["units"] = "V"
main_data.create_soft_link(name="i1", target="/entry/collection/readout_groups/async/nidaq/nidaq_ai2_mean/value")
##################
## i2
###################
if (int(ai_chans_bits) & (1<<4)) !=0:
i2 = main_data.create_group(name="i2")
i2.attrs["NX_class"] = "NXdata"
i2.attrs["units"] = "V"
main_data.create_soft_link(name="i2", target="/entry/collection/readout_groups/async/nidaq/nidaq_ai4_mean/value")
##################
## ci sum
###################
if int(ci_chans_bits) > 0:
ci_sum = main_data.create_group(name="Fluorescence_Sum")
ci_sum.attrs["NX_class"] = "NXdata"
ci_sum.attrs["units"] = "counts"
main_data.create_soft_link(name="Fluorescence_Sum", target="/entry/collection/readout_groups/async/nidaq/nidaq_cisum/value")
##################
## mu sample, test whether the signal exists. how to check from config?
###################
if (int(add_chans_bits) & (1<<0)) !=0:
mu_sample = main_data.create_group(name="mu_sample")
mu_sample.attrs["NX_class"] = "NXdata"
main_data.create_soft_link(name="mu_sample", target="/entry/collection/readout_groups/async/nidaq/nidaq_smpl_abs/value")
##################
## mu reference, test whether the signal exists. how to check from config?
###################
if (int(add_chans_bits) & (1<<1)) !=0:
mu_reference = main_data.create_group(name="mu_reference")
mu_reference.attrs["NX_class"] = "NXdata"
main_data.create_soft_link(name="mu_reference", target="/entry/collection/readout_groups/async/nidaq/nidaq_ref_abs/value")
+9 -1
View File
@@ -1 +1,9 @@
from .exafs_scan import EXAFSScan
from .exafs_scan import EXAFSScan
from .mono_bragg_scans import (
XASAdvancedScan,
# XASAdvancedScanWithXRD,
XASSimpleScan,
# XASSimpleScanWithXRD,
)
from .nidaq_cont_scan import NIDAQContinuousScan
@@ -1,6 +1,6 @@
# from .metadata_schema_template import ExampleSchema
# from .metadata_schema_xas_simple_scan import xas_simple_scan_schema
METADATA_SCHEMA_REGISTRY = {
METADATA_SCHEMA_REGISTRY = { # "xas_simple_scan": xas_simple_scan_schema
# Add models which should be used to validate scan metadata here.
# Make a model according to the template, and import it as above
# Then associate it with a scan like so:
@@ -9,4 +9,4 @@ METADATA_SCHEMA_REGISTRY = {
# Define a default schema type which should be used as the fallback for everything:
DEFAULT_SCHEMA = None
DEFAULT_SCHEMA = None
@@ -0,0 +1,8 @@
from bec_lib.metadata_schema import BasicScanMetadata
#
#
class xas_simple_scan_schema(BasicScanMetadata):
Edge: str
Element: str
+338
View File
@@ -0,0 +1,338 @@
"""This module contains the scan classes for the mono bragg motor of the SuperXAS beamline."""
import time
from typing import Literal
import numpy as np
from bec_lib.device import DeviceBase
from bec_lib.logger import bec_logger
from bec_server.scan_server.scans import AsyncFlyScanBase
logger = bec_logger.logger
class XASSimpleScan(AsyncFlyScanBase):
"""Class for the XAS simple scan"""
scan_name = "xas_simple_scan"
scan_type = "fly"
scan_report_hint = "device_progress"
required_kwargs = []
use_scan_progress_report = False
pre_move = False
gui_config = {
"Movement Parameters": ["start", "stop"],
"Scan Parameters": ["scan_time", "scan_duration"],
}
def __init__(
self,
start: float,
stop: float,
scan_time: float,
scan_duration: float,
motor: DeviceBase = "mo1_bragg",
**kwargs,
):
"""The xas_simple_scan is used to start a simple oscillating scan on the mono bragg motor.
Start and Stop define the energy range for the scan, scan_time is the time for one scan
cycle and scan_duration is the duration of the scan. If scan duration is set to 0, the
scan will run infinitely.
Args:
start (float): Start energy for the scan.
stop (float): Stop energy for the scan.
scan_time (float): Time for one scan cycle.
scan_duration (float): Duration of the scan.
motor (DeviceBase, optional): Motor device to be used for the scan.
Defaults to "mo1_bragg".
Examples:
>>> scans.xas_simple_scan(start=8000, stop=9000, scan_time=1, scan_duration=10)
"""
super().__init__(**kwargs)
self.motor = motor
self.start = start
self.stop = stop
self.scan_time = scan_time
self.scan_duration = scan_duration
self.primary_readout_cycle = 1
def stage(self):
"""call the stage procedure"""
# Compute position for mo1_gonio pre move
# Since energy is not linear to angle, we have to calculate the angles first.
pos_start = yield from self.stubs.send_rpc_and_wait(
"mo1_bragg",
"convert_angle_energy",
mode = "EnergyToAngle",
inp = self.start,
)
pos_end = yield from self.stubs.send_rpc_and_wait(
"mo1_bragg",
"convert_angle_energy",
mode = "EnergyToAngle",
inp = self.stop,
)
# Goniometer position is in the middle of the start and stop angle of the scan
pos = (pos_start + pos_end) / 2
# Premove with mo1_gonio
yield from self.stubs.send_rpc_and_wait(
"mo1_gonio",
"move",
position = pos,
wait = True,
timeout = 30, # 30 seconds timeout
)
# Continue with staging the devices
yield from self.stubs.stage()
def update_readout_priority(self):
"""Ensure that NIDAQ is not monitored for any quick EXAFS."""
super().update_readout_priority()
self.readout_priority["async"].append("nidaq")
def prepare_positions(self):
"""Prepare the positions for the scan.
Use here only start and end energy defining the range for the scan.
"""
self.positions = np.array([self.start, self.stop], dtype=float)
self.num_pos = None
yield None
def pre_scan(self):
"""Pre Scan action."""
self._check_limits()
# Ensure parent class pre_scan actions to be called.
yield from super().pre_scan()
def scan_report_instructions(self):
"""
Return the instructions for the scan report.
"""
yield from self.stubs.scan_report_instruction({"device_progress": [self.motor]})
def scan_core(self):
"""Run the scan core.
Kickoff the oscillation on the Bragg motor and wait for the completion of the motion.
"""
# Start the oscillation on the Bragg motor.
yield from self.stubs.kickoff(device=self.motor)
complete_status = yield from self.stubs.complete(device=self.motor, wait=False)
while not complete_status.done:
# Readout monitored devices
yield from self.stubs.read(group="monitored", point_id=self.point_id)
time.sleep(self.primary_readout_cycle)
self.point_id += 1
self.num_pos = self.point_id
# class XASSimpleScanWithXRD(XASSimpleScan):
# """Class for the XAS simple scan with XRD"""
# scan_name = "xas_simple_scan_with_xrd"
# gui_config = {
# "Movement Parameters": ["start", "stop"],
# "Scan Parameters": ["scan_time", "scan_duration"],
# "Low Energy Range": ["xrd_enable_low", "num_trigger_low", "exp_time_low", "cycle_low"],
# "High Energy Range": ["xrd_enable_high", "num_trigger_high", "exp_time_high", "cycle_high"],
# }
# def __init__(
# self,
# start: float,
# stop: float,
# scan_time: float,
# scan_duration: float,
# xrd_enable_low: bool,
# num_trigger_low: int,
# exp_time_low: float,
# cycle_low: int,
# xrd_enable_high: bool,
# num_trigger_high: int,
# exp_time_high: float,
# cycle_high: float,
# motor: DeviceBase = "mo1_bragg",
# **kwargs,
# ):
# """The xas_simple_scan_with_xrd is an oscillation motion on the mono motor
# with XRD triggering at low and high energy ranges.
# If scan duration is set to 0, the scan will run infinitely.
# Args:
# start (float): Start energy for the scan.
# stop (float): Stop energy for the scan.
# scan_time (float): Time for one oscillation .
# scan_duration (float): Total duration of the scan.
# xrd_enable_low (bool): Enable XRD triggering for the low energy range.
# num_trigger_low (int): Number of triggers for the low energy range.
# exp_time_low (float): Exposure time for the low energy range.
# cycle_low (int): Specify how often the triggers should be considered,
# every nth cycle for low
# xrd_enable_high (bool): Enable XRD triggering for the high energy range.
# num_trigger_high (int): Number of triggers for the high energy range.
# exp_time_high (float): Exposure time for the high energy range.
# cycle_high (int): Specify how often the triggers should be considered,
# every nth cycle for high
# motor (DeviceBase, optional): Motor device to be used for the scan.
# Defaults to "mo1_bragg".
# Examples:
# >>> scans.xas_simple_scan_with_xrd(start=8000, stop=9000, scan_time=1, scan_duration=10, xrd_enable_low=True, num_trigger_low=5, cycle_low=2, exp_time_low=100, xrd_enable_high=False, num_trigger_high=3, cycle_high=1, exp_time_high=1000)
# """
# super().__init__(
# start=start,
# stop=stop,
# scan_time=scan_time,
# scan_duration=scan_duration,
# motor=motor,
# **kwargs,
# )
# self.xrd_enable_low = xrd_enable_low
# self.num_trigger_low = num_trigger_low
# self.exp_time_low = exp_time_low
# self.cycle_low = cycle_low
# self.xrd_enable_high = xrd_enable_high
# self.num_trigger_high = num_trigger_high
# self.exp_time_high = exp_time_high
# self.cycle_high = cycle_high
class XASAdvancedScan(XASSimpleScan):
"""Class for the XAS advanced scan"""
scan_name = "xas_advanced_scan"
gui_config = {
"Movement Parameters": ["start", "stop"],
"Scan Parameters": ["scan_time", "scan_duration"],
"Spline Parameters": ["p_kink", "e_kink"],
}
def __init__(
self,
start: float,
stop: float,
scan_time: float,
scan_duration: float,
p_kink: float,
e_kink: float,
motor: DeviceBase = "mo1_bragg",
**kwargs,
):
"""The xas_advanced_scan is an oscillation motion on the mono motor.
Start and Stop define the energy range for the scan, scan_time is the
time for one scan cycle and scan_duration is the duration of the scan.
If scan duration is set to 0, the scan will run infinitely.
p_kink and e_kink add a kink to the motion profile to slow down in the
exafs region of the scan.
Args:
start (float): Start angle for the scan.
stop (float): Stop angle for the scan.
scan_time (float): Time for one oscillation .
scan_duration (float): Total duration of the scan.
p_kink (float): Position of the kink.
e_kink (float): Energy of the kink.
motor (DeviceBase, optional): Motor device to be used for the scan.
Defaults to "mo1_bragg".
Examples:
>>> scans.xas_advanced_scan(start=10000, stop=12000, scan_time=0.5, scan_duration=10, p_kink=50, e_kink=10500)
"""
super().__init__(
start=start,
stop=stop,
scan_time=scan_time,
scan_duration=scan_duration,
motor=motor,
**kwargs,
)
self.p_kink = p_kink
self.e_kink = e_kink
# class XASAdvancedScanWithXRD(XASAdvancedScan):
# """Class for the XAS advanced scan with XRD"""
# scan_name = "xas_advanced_scan_with_xrd"
# gui_config = {
# "Movement Parameters": ["start", "stop"],
# "Scan Parameters": ["scan_time", "scan_duration"],
# "Spline Parameters": ["p_kink", "e_kink"],
# "Low Energy Range": ["xrd_enable_low", "num_trigger_low", "exp_time_low", "cycle_low"],
# "High Energy Range": ["xrd_enable_high", "num_trigger_high", "exp_time_high", "cycle_high"],
# }
# def __init__(
# self,
# start: float,
# stop: float,
# scan_time: float,
# scan_duration: float,
# p_kink: float,
# e_kink: float,
# xrd_enable_low: bool,
# num_trigger_low: int,
# exp_time_low: float,
# cycle_low: int,
# xrd_enable_high: bool,
# num_trigger_high: int,
# exp_time_high: float,
# cycle_high: float,
# motor: DeviceBase = "mo1_bragg",
# **kwargs,
# ):
# """The xas_advanced_scan is an oscillation motion on the mono motor
# with XRD triggering at low and high energy ranges.
# Start and Stop define the energy range for the scan, scan_time is the time for
# one scan cycle and scan_duration is the duration of the scan. If scan duration
# is set to 0, the scan will run infinitely. p_kink and e_kink add a kink to the
# motion profile to slow down in the exafs region of the scan.
# Args:
# start (float): Start angle for the scan.
# stop (float): Stop angle for the scan.
# scan_time (float): Time for one oscillation .
# scan_duration (float): Total duration of the scan.
# p_kink (float): Position of kink.
# e_kink (float): Energy of the kink.
# xrd_enable_low (bool): Enable XRD triggering for the low energy range.
# num_trigger_low (int): Number of triggers for the low energy range.
# exp_time_low (float): Exposure time for the low energy range.
# cycle_low (int): Specify how often the triggers should be considered,
# every nth cycle for low
# xrd_enable_high (bool): Enable XRD triggering for the high energy range.
# num_trigger_high (int): Number of triggers for the high energy range.
# exp_time_high (float): Exposure time for the high energy range.
# cycle_high (int): Specify how often the triggers should be considered,
# every nth cycle for high
# motor (DeviceBase, optional): Motor device to be used for the scan.
# Defaults to "mo1_bragg".
# Examples:
# >>> scans.xas_advanced_scan_with_xrd(start=10000, stop=12000, scan_time=0.5, scan_duration=10, p_kink=50, e_kink=10500, xrd_enable_low=True, num_trigger_low=5, cycle_low=2, exp_time_low=100, xrd_enable_high=False, num_trigger_high=3, cycle_high=1, exp_time_high=1000)
# """
# super().__init__(
# start=start,
# stop=stop,
# scan_time=scan_time,
# scan_duration=scan_duration,
# p_kink=p_kink,
# e_kink=e_kink,
# motor=motor,
# **kwargs,
# )
# self.p_kink = p_kink
# self.e_kink = e_kink
# self.xrd_enable_low = xrd_enable_low
# self.num_trigger_low = num_trigger_low
# self.exp_time_low = exp_time_low
# self.cycle_low = cycle_low
# self.xrd_enable_high = xrd_enable_high
# self.num_trigger_high = num_trigger_high
# self.exp_time_high = exp_time_high
# self.cycle_high = cycle_high
+84
View File
@@ -0,0 +1,84 @@
"""This module contains the scan class for the nidaq of the SuperXAS beamline for use in continuous mode."""
import time
from typing import Literal
import numpy as np
from bec_lib.device import DeviceBase
from bec_lib.logger import bec_logger
from bec_server.scan_server.scans import AsyncFlyScanBase
logger = bec_logger.logger
class NIDAQContinuousScan(AsyncFlyScanBase):
"""Class for the nidaq continuous scan (without mono)"""
scan_name = "nidaq_continuous_scan"
scan_type = "fly"
scan_report_hint = "device_progress"
required_kwargs = []
use_scan_progress_report = False
pre_move = False
gui_config = {"Scan Parameters": ["scan_duration"], "Data Compression": ["compression"]}
def __init__(
self, scan_duration: float, daq: DeviceBase = "nidaq", compression: bool = False, **kwargs
):
"""The NIDAQ continuous scan is used to measure with the NIDAQ without moving the
monochromator or any other motor. The NIDAQ thus runs in continuous mode, with a
set scan_duration.
Args:
scan_duration (float): Duration of the scan.
daq (DeviceBase, optional): DAQ device to be used for the scan.
Defaults to "nidaq".
Examples:
>>> scans.nidaq_continuous_scan(scan_duration=10)
"""
super().__init__(**kwargs)
self.scan_duration = scan_duration
self.daq = daq
self.start_time = 0
self.primary_readout_cycle = 1
self.scan_parameters["scan_duration"] = scan_duration
self.scan_parameters["compression"] = compression
def update_readout_priority(self):
"""Ensure that NIDAQ is not monitored for any quick EXAFS."""
super().update_readout_priority()
self.readout_priority["async"].append("nidaq")
def prepare_positions(self):
"""Prepare the positions for the scan."""
yield None
def pre_scan(self):
"""Pre Scan action."""
self.start_time = time.time()
# Ensure parent class pre_scan actions to be called.
yield from super().pre_scan()
def scan_report_instructions(self):
"""
Return the instructions for the scan report.
"""
yield from self.stubs.scan_report_instruction({"device_progress": [self.daq]})
def scan_core(self):
"""Run the scan core.
Kickoff the acquisition of the NIDAQ wait for the completion of the scan.
"""
kickoff_status = yield from self.stubs.kickoff(device=self.daq)
kickoff_status.wait(timeout=5) # wait for proper kickoff of device
complete_status = yield from self.stubs.complete(device=self.daq, wait=False)
while not complete_status.done:
# Readout monitored devices
yield from self.stubs.read(group="monitored", point_id=self.point_id)
time.sleep(self.primary_readout_cycle)
self.point_id += 1
self.num_pos = self.point_id
@@ -0,0 +1,216 @@
from __future__ import annotations
import queue
import threading
import time
import traceback
import h5py
import numpy as np
from bec_lib import bec_logger, messages
from bec_lib.bec_service import BECService
from bec_lib.endpoints import MessageEndpoints
from bec_lib.file_utils import FileWriter
from bec_lib.redis_connector import MessageObject, RedisConnector
from bec_lib.service_config import ServiceConfig
logger = bec_logger.logger
class NIDAQWriterService(BECService):
"""
A service that receives data from the NIDAQ through Redis and writes it continuously to a file.
"""
reshape_dataset = True
use_redis_stream = True
def __init__(self, config: ServiceConfig, connector_cls: RedisConnector) -> None:
super().__init__(config=config, connector_cls=connector_cls, unique_service=True)
self.queue = queue.Queue()
config = self._service_config.config.get("file_writer")
self.writer_mixin = FileWriter(service_config=config)
self._scan_status_consumer = None
self._ni_data_consumer = None
self._ni_data_event = None
self._ni_writer = None
self._ni_writer_event = None
self.scan_number = None
self.scan_is_running = False
self.filename = ""
self.elapsed_time = 0
self.start_time = 0
self._start_scan_status_consumer()
self._start_ni_data_consumer()
self._start_ni_writer()
def _start_scan_status_consumer(self) -> None:
"""
Start the scan consumer.
"""
self._scan_status_consumer = self.connector.consumer(
MessageEndpoints.scan_status(), cb=self._scan_status_callback, parent=self
)
self._scan_status_consumer.start()
@staticmethod
def _scan_status_callback(message: MessageObject, parent: NIDAQWriterService) -> None:
"""
Callback for scan status messages.
"""
msg = message.value
if not msg:
return
parent.handle_scan_status(msg)
def _start_ni_data_consumer(self) -> None:
"""
Start the NI data consumer.
"""
self._ni_data_event = threading.Event()
self._ni_data_consumer = threading.Thread(target=self._run_read_loop, daemon=True)
self._ni_data_consumer.start()
def _start_ni_writer(self) -> None:
"""
Start the NI data writer.
"""
self._ni_writer_event = threading.Event()
self._ni_writer = threading.Thread(target=self._write_data, daemon=True)
self._ni_writer.start()
def _run_read_loop(self) -> None:
"""
Run the read loop.
"""
while not self._ni_data_event.is_set():
self._read_data()
def _read_data(self):
"""
Read data from Redis.
"""
if not self.scan_is_running:
time.sleep(0.01)
return
self.filename = self.writer_mixin.compile_full_filename(self.scan_number, "ni.h5")
start_time = time.time()
if self.use_redis_stream:
msg = self.connector.xread("ni_data")
if msg:
try:
num_msgs = len(msg[0][1])
logger.debug(
f"Received {num_msgs} messages in {time.time() - start_time} seconds"
)
msgs = [messages.DeviceMessage.loads(m[1][b"device_msg"]) for m in msg[0][1]]
start_time = time.time()
self.handle_ni_data(msgs)
logger.debug(
f"Handled {num_msgs} messages in {time.time() - start_time} seconds"
)
except Exception as exc:
content = traceback.format_exc()
logger.error(f"Failed to parse message: {content}")
time.sleep(0.01)
else:
msgs = self.connector._redis_conn.lpop("ni_data:val", 20)
time.sleep(0.001)
if msgs:
try:
msgs = [messages.DeviceMessage.loads(msg) for msg in msgs]
print(f"Received {len(msgs)} messages in {time.time() - start_time} seconds")
start_time = time.time()
self.handle_ni_data(msgs)
print(f"Handled {len(msgs)} messages in {time.time() - start_time} seconds")
except Exception as exc:
content = traceback.format_exc()
logger.error(f"Failed to parse message: {content}")
def handle_scan_status(self, msg: messages.ScanStatusMessage) -> None:
"""
Handle scan status messages.
Args:
msg: The scan status message.
"""
status = msg.content["status"]
if status == "open":
self.scan_number = msg.content["info"].get("scan_number")
if self.scan_number is not None:
self.scan_is_running = True
else:
self.scan_is_running = False
def handle_ni_data(self, msgs: list[messages.DeviceMessage]) -> None:
"""
Receive NI data messages and write them to the writer queue.
Args:
msgs: The NI data messages.
"""
logger.info(f"Received {len(msgs)} NI data messages")
# concatenate all messages
signals = {}
for key in msgs[0].content["signals"]:
signals[key] = np.concatenate([msg.content["signals"][key]["value"] for msg in msgs])
# write data to queue
self.queue.put(signals)
def _write_data(self) -> None:
"""
Get data from the writer queue and write it to disk.
"""
while not self._ni_writer_event.is_set():
signals = self.queue.get()
logger.info(f"Remaining tasks: {self.queue.qsize()}")
self.write_data(signals)
self.queue.task_done()
def write_data(self, signals: dict) -> None:
"""
Write data to disk.
Args:
signals: The signals to write to disk.
"""
# create a new file if it doesn't exist, otherwise append to it
logger.info("Writing NI data to HDF5 file")
start_time = time.time()
if not self.filename:
return
with h5py.File(self.filename, "a") as file:
if self.reshape_dataset:
for key in signals:
# if the dataset already exists, append to it
if key in file:
dataset = file[key]
dataset.resize(dataset.shape[0] + len(signals[key]), axis=0)
dataset[-len(signals[key]) :] = signals[key]
# otherwise create a new dataset
else:
file.create_dataset(key, data=signals[key], chunks=True, maxshape=(None,))
else:
# get all group names
group_names = list(file.keys())
# get max dataset number
dataset_num = [int(name.split("_")[1]) for name in group_names if "dataset" in name]
if dataset_num:
dataset_num = max(dataset_num) + 1
else:
dataset_num = 0
group = file.create_group(f"dataset_{dataset_num}")
for key in signals:
group.create_dataset(key, data=signals[key], chunks=True, maxshape=(None,))
logger.info(f"Finished writing NI data in {time.time() - start_time} seconds")
@@ -0,0 +1 @@
from .NIDAQ_writer import NIDAQWriterService
@@ -0,0 +1,34 @@
from bec_lib import messages
from bec_lib.endpoints import MessageEndpoints
from bec_lib.redis_connector import RedisConnector
def send_scan_status(scan_number, status):
if status == "start":
scan_status_msg = messages.ScanStatusMessage(
scanID="test", status="open", info={"scan_number": scan_number}
)
elif status == "stop":
scan_status_msg = messages.ScanStatusMessage(
scanID="test", status="closed", info={"scan_number": scan_number}
)
else:
raise ValueError("Unknown status")
producer = RedisConnector(["localhost:6379"]).producer()
producer.send(MessageEndpoints.scan_status(), scan_status_msg.dumps())
print(f"Sent scan status message {scan_status_msg}")
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Scan status helper")
command = parser.add_subparsers(dest="command")
start = command.add_parser("start", help="Start a new scan")
start.add_argument("--scan_number", type=int, required=True, help="Scan number")
stop = command.add_parser("stop", help="Stop the scan")
stop.add_argument("--scan_number", type=int, required=True, help="Scan number")
args = parser.parse_args()
send_scan_status(args.scan_number, args.command)
@@ -0,0 +1,44 @@
import threading
import time
import numpy as np
from bec_lib import messages
from bec_lib.redis_connector import RedisConnector
class NIDAQSim(threading.Thread):
use_redis_stream = True
def run(self):
print("NIDAQSim running")
index = 0
producer = RedisConnector(["localhost:6379"]).producer()
signal = np.asarray(range(index, index + 600000))
signals = {"signal1": signal, "signal2": signal}
msg = messages.DeviceMessage(signals=signals)
msg = msg.dumps()
messages.DeviceMessage.loads(msg)
total_time = time.time()
while True:
# if index > 1000:
# break
start = time.time()
# signals = {
# "signal1": np.asarray(range(index, index + 300000)),
# "signal2": np.asarray(range(index, index + 300000)),
# }
index += 1
if self.use_redis_stream:
producer.xadd("ni_data", {"device_msg": msg}, max_size=100)
else:
producer.lpush("ni_data", msg, max_size=10)
time.sleep(0.5)
print(f"Elapsed time: {time.time() - start}")
if __name__ == "__main__":
NIDAQSim().start()
+29
View File
@@ -0,0 +1,29 @@
import argparse
import threading
from bec_lib import bec_logger
from bec_lib.redis_connector import RedisConnector
from bec_lib.service_config import ServiceConfig
from superxas_bec.services.NIDAQ_writer import NIDAQWriterService
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("--config", default="", help="path to the config file")
clargs = parser.parse_args()
config_path = clargs.config
config = ServiceConfig(config_path)
bec_logger.level = bec_logger.LOGLEVEL.INFO
logger = bec_logger.logger
bec_server = NIDAQWriterService(config=config, connector_cls=RedisConnector)
try:
event = threading.Event()
# pylint: disable=E1102
logger.success("Started NIDAQ writer service")
event.wait()
except KeyboardInterrupt as e:
# bec_server.connector.raise_error("KeyboardInterrupt")
bec_server.shutdown()
event.set()
raise e
+355
View File
@@ -0,0 +1,355 @@
"""Unit tests for the Timepix device with a mocked backend."""
from __future__ import annotations
import threading
from types import SimpleNamespace
from unittest import mock
import numpy as np
import ophyd
import pytest
from bec_server.device_server.tests.utils import DMMock
from ophyd import DeviceStatus, StatusBase
from ophyd_devices.tests.utils import MockPV, patch_dual_pvs
from superxas_bec.devices.timepix.timepix import (
ACQUIRESTATUS,
DATASOURCE,
EXPOSUREMODE,
FILEWRITEMODE,
TDCEdge,
TDCOuput,
TRIGGERMODE,
TRIGGERSOURCE,
Timepix,
)
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_client import TimePixFlyStatus
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_interface import (
NetAddresses,
PixelMap,
)
# pylint: disable=protected-access
# pylint: disable=redefined-outer-name
class FakeBackendClient:
"""Backend client double that can finish status callbacks on demand."""
def __init__(self):
self.status = TimePixFlyStatus.CONFIG
self._status_callbacks = {}
def add_status_callback(self, status, success, error, run=True):
"""Register status callback with optional immediate completion."""
if run and self.status in success:
status.set_finished()
return
self._status_callbacks[id(status)] = (status, success, error)
def emit_status(self, status_value: TimePixFlyStatus):
"""Resolve tracked status objects with a simulated backend state."""
self.status = status_value
for cb_id, (status, success, error) in list(self._status_callbacks.items()):
with status._lock:
if status.done:
self._status_callbacks.pop(cb_id, None)
continue
if status_value in success:
status.set_finished()
self._status_callbacks.pop(cb_id, None)
elif status_value in error:
status.set_exception(RuntimeError(f"backend entered {status_value.value}"))
self._status_callbacks.pop(cb_id, None)
def get_net_addresses(self):
"""Return a deterministic backend raw-data address."""
return NetAddresses(
control="127.0.0.1:8452",
address="127.0.0.1:8451",
server="127.0.0.1:8080",
)
class FakeBackend:
"""Minimal backend double used to isolate Timepix from backend integration."""
def __init__(self, *args, **kwargs):
self.hostname = kwargs.get("hostname") or "localhost"
self.socket_port = kwargs.get("socket_port", 9876)
self.timepix_fly_client = FakeBackendClient()
self.on_connected = mock.Mock()
self.on_stage = mock.Mock()
self.on_stop = mock.Mock()
self.on_destroy = mock.Mock()
self.add_callback = mock.Mock()
self._trigger_status = StatusBase()
self._trigger_status.set_finished()
self._complete_status = StatusBase()
self._complete_status.set_finished()
def on_trigger(self):
"""Return a backend-prepared trigger status."""
return self._trigger_status
def on_trigger_finished(self):
"""Return the status that resolves when acquisition is complete."""
return self._complete_status
def on_complete(self, status=None):
"""Return a backend completion status."""
if status is None:
return self._complete_status
status.set_finished()
return status
def _finished_status(device=None):
"""Create a finished status for mocked signal set operations."""
status = DeviceStatus(device=device)
status.set_finished()
return status
def _force_signal_value(signal, value):
"""Set a mocked PV-backed signal value, including read-only EPICS signals."""
if hasattr(signal, "_read_pv"):
signal._read_pv.mock_data = value
return
signal.put(value)
def _message_value(signal):
"""Extract the signal payload from a BEC message signal."""
msg = signal.get()
return msg.signals[signal.name]["value"]
@pytest.fixture(scope="function")
def timepix():
"""Timepix device with mocked EPICS signals and a fully mocked backend."""
backend = FakeBackend()
scan_info = SimpleNamespace(
msg=SimpleNamespace(
scan_name="step_scan",
scan_parameters={"exp_time": 0.1, "frames_per_trigger": 2},
num_points=3,
)
)
with (
mock.patch.object(ophyd, "cl") as mock_cl,
mock.patch(
"superxas_bec.devices.timepix.timepix.TimepixFlyBackend", return_value=backend
),
):
mock_cl.get_pv = MockPV
mock_cl.thread_class = threading.Thread
dev = Timepix(
name="timepix",
prefix="X10DA-ES-TPX1:",
backend_rest_url="localhost:8452",
hostname="localhost",
socket_port=9876,
scan_info=scan_info,
device_manager=DMMock(),
)
patch_dual_pvs(dev)
dev.backend = backend
dev._poll_thread = mock.Mock()
for walk in dev.walk_signals():
signal = walk.item
if hasattr(signal, "set") and hasattr(signal, "put"):
signal.set = mock.Mock(side_effect=lambda value, _sig=signal, **_kw: (_force_signal_value(_sig, value), _finished_status(_sig))[1]) # type: ignore[method-assign]
yield dev
dev._poll_thread_kill_event.set()
@pytest.fixture(scope="function")
def pixel_map():
"""Small valid pixel map used for stage tests."""
return PixelMap(chips=[[{"i": 0, "p": 0, "f": [1.0]}]])
def test_timepix_on_connected_configures_signals_and_registers_callback(timepix):
"""Connected hook should configure camera, file writer, and backend callback."""
timepix.on_connected()
assert timepix.cam.tdc1_enable.get() == 1
assert timepix.cam.tdc2_enable.get() == 1
assert timepix.cam.raw_enable.get() == 1
assert timepix.cam.tdc1_edge.get() == TDCEdge.RISING
assert timepix.cam.tdc1_output.get() == TDCOuput.ALL_CHANNELS
assert timepix.cam.trigger_mode.get() == TRIGGERMODE.INTERNAL
assert timepix.cam.trigger_source.get() == TRIGGERSOURCE.HDMI1_1
assert timepix.cam.exposure_mode.get() == EXPOSUREMODE.TIMED
assert timepix.cam.array_counter.get() == 0
assert timepix.hdf.enable.get() == "1"
assert timepix.hdf.file_write_mode.get() == FILEWRITEMODE.STREAM.value
assert timepix.hdf.auto_save.get() == 1
assert timepix.cam.array_callbacks.get() == 1
timepix.backend.add_callback.assert_called_once_with(timepix.msg_buffer_callback)
timepix._poll_thread.start.assert_called_once()
def test_timepix_on_stage_configures_camera_writer_and_mocked_backend(timepix, pixel_map):
"""Stage should configure camera settings and forward config to the backend."""
timepix._pixel_map = pixel_map
with (
mock.patch.object(timepix.hdf.enable, "get", return_value=1),
mock.patch(
"superxas_bec.devices.timepix.timepix.get_full_path",
return_value="/tmp/timepix/test_scan.h5",
),
):
timepix.on_stage()
assert timepix.cam.acquire_time.get() == pytest.approx(0.1 - timepix._readout_time)
assert timepix.cam.acquire_period.get() == pytest.approx(0.1)
assert timepix.cam.num_images.get() == 2
assert timepix.cam.data_source.get() == DATASOURCE.IMAGE
assert timepix.hdf.file_path.get() == "/tmp/timepix"
assert timepix.hdf.file_name.get() == "test_scan.h5"
assert timepix.hdf.num_capture.get() == 6
assert timepix.hdf.capture.get() == 1
assert timepix.cam.raw_file_template.get() == ""
assert timepix.cam.raw_file_path.get() == "tcp://connect@127.0.0.1:8451"
assert timepix.backend.on_stage.call_count == 1
other_config = timepix.backend.on_stage.call_args.kwargs["other_config"]
assert other_config.output_uri == "tcp:localhost:9876"
assert other_config.TRoiStep == timepix.troistep
assert other_config.TRoiN == timepix.troin
assert timepix.backend.on_stage.call_args.kwargs["pixel_map"] == pixel_map
file_event = timepix.file_event.get()
assert file_event.file_path == "/tmp/timepix/test_scan.h5"
assert file_event.done is False
assert file_event.successful is False
def test_timepix_on_stage_without_xes_skips_backend_configuration(timepix):
"""When XES is disabled, the backend-specific stage call should be skipped."""
timepix.enable_xes = False
with mock.patch(
"superxas_bec.devices.timepix.timepix.get_full_path",
return_value="/tmp/timepix/no_xes_scan.h5",
):
timepix.on_stage()
timepix.backend.on_stage.assert_not_called()
def test_timepix_on_trigger_combines_camera_and_backend_status(timepix):
"""Trigger should arm the backend first and then drive the camera."""
backend_done = StatusBase()
backend_done.set_finished()
backend_finished = StatusBase()
timepix.backend._trigger_status = backend_done
timepix.backend._complete_status = backend_finished
status = timepix.on_trigger()
assert isinstance(status, StatusBase)
assert timepix.cam.acquire.get() == 1
_force_signal_value(timepix.cam.acquire_busy, ACQUIRESTATUS.ACQUIRING)
_force_signal_value(timepix.cam.acquire_busy, ACQUIRESTATUS.DONE)
backend_finished.set_finished()
status.wait(timeout=0.1)
assert status.done is True
assert status.success is True
def test_timepix_on_complete_marks_file_event_success(timepix):
"""Complete should wait for writer/backend completion and emit a success file event."""
timepix._full_path = "/tmp/timepix/final_scan.h5"
timepix._n_images = 3
_force_signal_value(timepix.cam.acquire_busy, ACQUIRESTATUS.ACQUIRING)
timepix.hdf.capture.put(ACQUIRESTATUS.ACQUIRING)
timepix.hdf.write_file.put(ACQUIRESTATUS.ACQUIRING)
with mock.patch.object(timepix.hdf.enable, "get", return_value=1):
status = timepix.on_complete()
_force_signal_value(timepix.cam.acquire_busy, ACQUIRESTATUS.DONE)
timepix.hdf.capture.put(ACQUIRESTATUS.DONE)
timepix.hdf.write_file.put(ACQUIRESTATUS.DONE)
timepix.hdf.write_status.put(0)
_force_signal_value(timepix.hdf.num_captured, 3)
status.wait(timeout=0.1)
file_event = timepix.file_event.get()
assert file_event.file_path == "/tmp/timepix/final_scan.h5"
assert file_event.done is True
assert file_event.successful is True
def test_timepix_msg_buffer_callback_updates_xes_signals(timepix):
"""The backend message callback should populate all exported XES data signals."""
start_frame = {
"type": "StartFrame",
"Mode": "TOA",
"TRoiStart": 0,
"TRoiStep": 1,
"TRoiN": 2,
"NumEnergyPoints": 8,
"save_interval": 1,
}
data_frames = [
{
"type": "XesData",
"period": 131000,
"totalEvents": 36,
"TDSpectra": list(range(16)),
"beforeROI": 0,
"afterROI": 0,
}
]
end_frame = {"type": "EndFrame", "error": "", "periods": 4}
timepix.troin = 2
timepix.msg_buffer_callback(start_frame, data_frames, end_frame)
expected_xes = np.array(
[[0, 1, 2, 3, 4, 5, 6, 7], [8, 9, 10, 11, 12, 13, 14, 15]], dtype=np.float32
)
np.testing.assert_array_equal(_message_value(timepix.xes_data), expected_xes)
np.testing.assert_array_equal(
_message_value(timepix.xes_spectra), np.array([28, 92], dtype=np.float32)
)
np.testing.assert_array_equal(
_message_value(timepix.xes_energy_1), np.array([6, 38], dtype=np.float32)
)
np.testing.assert_array_equal(
_message_value(timepix.xes_energy_2), np.array([22, 54], dtype=np.float32)
)
np.testing.assert_array_equal(
_message_value(timepix.xes_data_accumulated_1), np.array([6, 38], dtype=np.float32)
)
np.testing.assert_array_equal(
_message_value(timepix.xes_data_accumulated_2), np.array([22, 54], dtype=np.float32)
)
assert _message_value(timepix.total_periods) == pytest.approx(4 / 131000)
np.testing.assert_array_equal(_message_value(timepix.tds_period), np.array([1.0]))
assert _message_value(timepix.total_events) == 36
def test_timepix_on_stop_stops_camera_writer_and_backend(timepix):
"""Stop should stop camera acquisition and delegate backend stop."""
timepix.cam.acquire.put(1)
timepix.hdf.capture.put(1)
timepix.on_stop()
assert timepix.cam.acquire.get() == 0
assert timepix.hdf.capture.get() == 0
timepix.backend.on_stop.assert_called_once()
def test_timepix_on_destroy_cleans_up_backend(timepix):
"""Destroy should stop polling and forward cleanup to the backend."""
timepix.on_destroy()
assert timepix._poll_thread_kill_event.is_set() is True
timepix.backend.on_stop.assert_called_once()
timepix.backend.on_destroy.assert_called_once()
+191 -7
View File
@@ -1,15 +1,199 @@
"""This module tests the Timepix Fly backend functionality."""
"""Unit tests for the Timepix Fly backend."""
from __future__ import annotations
import pytest
from types import SimpleNamespace
from unittest import mock
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_backend import TimepixFlyBackend
import pytest
from ophyd import StatusBase
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_backend import (
TimepixFlyBackend,
)
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_client import (
TimePixFlyStatus,
TimePixStatusError,
)
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_interface import (
NetAddresses,
OtherConfigModel,
PixelMap,
)
class FakeTimepixFlyClient:
"""Minimal client double that can drive backend status callbacks."""
def __init__(self, rest_url: str, ws_url: str):
self.rest_url = rest_url
self.ws_url = ws_url
self.status = TimePixFlyStatus.CONFIG
self._status_callbacks = {}
self.error_message = "boom"
self.on_connected = mock.Mock()
self.shutdown = mock.Mock()
self.start = mock.Mock()
self.stop_running_collection = mock.Mock()
self.set_other_config = mock.Mock()
self.set_pixel_map = mock.Mock()
self.get_net_addresses = mock.Mock(
return_value=NetAddresses(
control="127.0.0.1:8452",
address="127.0.0.1:8451",
server="127.0.0.1:8080",
)
)
def add_status_callback(self, status, success, error, run=True):
"""Store callbacks and optionally resolve them immediately."""
if run:
if self.status in success:
status.set_finished()
return
if self.status in error:
status.set_exception(
TimePixStatusError(f"TimePixFly state '{self.status.value}': {self.error_message}")
)
return
self._status_callbacks[id(status)] = (status, success, error)
def last_error(self):
"""Return a lightweight error object."""
return SimpleNamespace(message=self.error_message)
def emit_status(self, status_value: TimePixFlyStatus):
"""Resolve stored status callbacks as if a websocket status update arrived."""
self.status = status_value
for cb_id, (status, success, error) in list(self._status_callbacks.items()):
with status._lock:
if status.done:
self._status_callbacks.pop(cb_id, None)
continue
if status_value in success:
status.set_finished()
self._status_callbacks.pop(cb_id, None)
elif status_value in error:
status.set_exception(
TimePixStatusError(
f"TimePixFly state '{status_value.value}': {self.error_message}"
)
)
self._status_callbacks.pop(cb_id, None)
@pytest.fixture(scope="function")
def timepix_fly_backend():
"""Fixture for creating a Timepix Fly backend instance."""
backend = TimepixFlyBackend(backend_rest_url="http://localhost:8000")
yield backend
def backend_with_states():
"""Return a backend together with a helper that emits backend states."""
with mock.patch(
"superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_backend.TimepixFlyClient",
FakeTimepixFlyClient,
):
backend = TimepixFlyBackend(backend_rest_url="localhost:8452", hostname="localhost")
yield backend, backend.timepix_fly_client
backend.on_destroy()
@pytest.fixture(scope="function")
def pixel_map():
"""Small valid pixel map for backend unit tests."""
return PixelMap(chips=[[{"i": 0, "p": 0, "f": [1.0]}]])
def test_timepix_fly_backend_stage_pushes_configuration(backend_with_states, pixel_map):
"""Stage should push both config objects to the client."""
backend, client = backend_with_states
other_config = OtherConfigModel(output_uri="tcp:localhost:9000", TRoiStep=2, TRoiN=16)
backend.on_stage(other_config=other_config, pixel_map=pixel_map)
client.set_other_config.assert_called_once_with(other_config)
client.set_pixel_map.assert_called_once_with(pixel_map)
def test_timepix_fly_backend_trigger_callback_success(backend_with_states):
"""Trigger status should resolve once the backend reports await_connection."""
backend, state_driver = backend_with_states
status = backend.on_trigger()
assert status.done is False
state_driver.emit_status(TimePixFlyStatus.AWAIT_CONNECTION)
status.wait(timeout=0.1)
assert status.done is True
assert status.success is True
def test_timepix_fly_backend_trigger_callback_error(backend_with_states):
"""Trigger status should fail when the backend reports an exception state."""
backend, state_driver = backend_with_states
status = backend.on_trigger()
state_driver.error_message = "failed to configure"
state_driver.emit_status(TimePixFlyStatus.EXCEPT)
with pytest.raises(TimePixStatusError, match="failed to configure"):
status.wait(timeout=0.1)
def test_timepix_fly_backend_complete_callback_success(backend_with_states):
"""Complete status should resolve when the backend goes back to config."""
backend, state_driver = backend_with_states
status = backend.on_complete()
assert status.done is True
assert status.success is True
state_driver.emit_status(TimePixFlyStatus.CONFIG)
def test_timepix_fly_backend_stop_cancels_tracked_statuses(backend_with_states):
"""Stopping the backend should fail all tracked statuses and stop collection."""
backend, client = backend_with_states
status = StatusBase()
backend.cancel_on_stop(status)
backend.on_stop()
client.stop_running_collection.assert_called_once()
with pytest.raises(RuntimeError, match="Stop called on device"):
status.wait(timeout=0.1)
def test_timepix_fly_backend_add_and_remove_callback(backend_with_states):
"""Callbacks can be registered and removed by id."""
backend, _ = backend_with_states
cb_id = backend.add_callback(lambda *_args, **_kwargs: None, kwd={"scan_id": 5})
stored_cb_id = next(iter(backend.callbacks))
assert str(stored_cb_id) == cb_id
backend.remove_callback(stored_cb_id)
assert stored_cb_id not in backend.callbacks
def test_timepix_fly_backend_decode_end_frame_runs_callbacks(backend_with_states):
"""The buffered frame callback should be invoked only once EndFrame arrives."""
backend, _ = backend_with_states
received = {}
def callback(start_frame, data_frames, end_frame, scan_id):
received["start_frame"] = start_frame
received["data_frames"] = data_frames
received["end_frame"] = end_frame
received["scan_id"] = scan_id
backend.add_callback(callback, kwd={"scan_id": 7})
backend._decode_received_data(
'{"type":"StartFrame","Mode":"TOA","TRoiStart":0,"TRoiStep":1,"TRoiN":2,"NumEnergyPoints":2,"save_interval":10}'
)
backend._decode_received_data(
'{"type":"XesData","period":1,"TDSpectra":[1,2,3,4],"totalEvents":4,"beforeROI":0,"afterROI":0}'
)
backend._decode_received_data('{"type":"EndFrame","error":"","periods":5}')
assert received["start_frame"]["type"] == "StartFrame"
assert received["data_frames"][0]["type"] == "XesData"
assert received["end_frame"]["type"] == "EndFrame"
assert received["scan_id"] == 7
assert backend._TimepixFlyBackend__msg_buffer == []