Compare commits
16 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| e3865fd3be | |||
| 5b14a10063 | |||
| 19182daa47 | |||
| 67a26f231d | |||
| d17f3deefa | |||
| f70ac8743d | |||
| 2981c436db | |||
| 09a0bc6372 | |||
| fff1e21481 | |||
| 2bfa7b6ca3 | |||
| 3f79b300ed | |||
| 36cffb72a5 | |||
| 55bc4585e2 | |||
| 4f1386f5e1 | |||
| d687d74a74 | |||
| aa6270fb55 |
@@ -12,7 +12,11 @@ classifiers = [
|
||||
"Programming Language :: Python :: 3",
|
||||
"Topic :: Scientific/Engineering",
|
||||
]
|
||||
|
||||
dependencies = [
|
||||
"numpy",
|
||||
"scipy",
|
||||
"xrt",
|
||||
"websockets",
|
||||
]
|
||||
|
||||
|
||||
@@ -0,0 +1,239 @@
|
||||
"""
|
||||
X10DA / SuperXAS Beamline Parameters.
|
||||
This file describes the parameter of each component of the SuperXAS beamline
|
||||
to be used for raytracing and geometrical calculations.
|
||||
"""
|
||||
|
||||
import os
|
||||
import numpy as np
|
||||
from collections import namedtuple
|
||||
|
||||
import xrt.backends.raycing.materials as rm
|
||||
|
||||
# if os.environ.get("USE_XRT", "True").lower() in ("1", "true", "yes"):
|
||||
# import xrt.backends.raycing.materials as rm # type: ignore
|
||||
# else:
|
||||
# class _DummyClass:
|
||||
# def __init__(self, *args, **kwargs):
|
||||
# pass
|
||||
# class _DummyMaterials:
|
||||
# Material = _DummyClass
|
||||
# CrystalSi = _DummyClass
|
||||
# rm = _DummyMaterials()
|
||||
|
||||
# XRT definitions
|
||||
filterBeryl = rm.Material('Be', rho=1.85, kind='plate')
|
||||
filterDiamond = rm.Material('C', rho=3.52, kind='plate')
|
||||
filterGraphite = rm.Material('C', rho=2.266, kind='plate')
|
||||
|
||||
stripeSi = rm.Material('Si', rho=2.33)
|
||||
stripePt = rm.Material('Pt', rho=21.45)
|
||||
stripeRh = rm.Material('Rh', rho=12.41)
|
||||
stripeCr = rm.Material('Cr', rho=7.14)
|
||||
stripePyrex = rm.Material('Si', rho=2.20) # Use Si as bare element and the density of SiO2
|
||||
|
||||
si111_1 = rm.CrystalSi(hkl=(1, 1, 1), tK=77) # first xtal surface
|
||||
si311_1 = rm.CrystalSi(hkl=(3, 1, 1), tK=77) # first xtal surface
|
||||
si333_1 = rm.CrystalSi(hkl=(3, 3, 3), tK=77) # first xtal surface
|
||||
si511_1 = rm.CrystalSi(hkl=(5, 1, 1), tK=77) # first xtal surface
|
||||
si111_2 = rm.CrystalSi(hkl=(1, 1, 1), tK=77) # second xtal surface
|
||||
si311_2 = rm.CrystalSi(hkl=(3, 1, 1), tK=77) # second xtal surface
|
||||
si333_2 = rm.CrystalSi(hkl=(3, 3, 3), tK=77) # second xtal surface
|
||||
si511_2 = rm.CrystalSi(hkl=(5, 1, 1), tK=77) # second xtal surface
|
||||
|
||||
filterDiamond = rm.Material('C', rho=3.52, kind='plate')
|
||||
filterBe = rm.Material('Be', rho=1.85, kind='plate')
|
||||
filterSi3N4 = rm.Material(['Si', 'N'], quantities=[3, 4], rho=3.44, kind='plate')
|
||||
filterAl = rm.Material('Al', rho=2.69, kind='plate')
|
||||
filterGraphite = rm.Material('C', rho=2.266, kind='plate')
|
||||
|
||||
sourceHeight = 0
|
||||
|
||||
#Synchrotron
|
||||
synchrotron = namedtuple('synchrotron', ['eE', 'eI', 'eEspread',
|
||||
'eEpsilonX', 'eEpsilonZ', 'betaX', 'betaZ'])
|
||||
|
||||
sls1 = synchrotron(
|
||||
eE = 2.4,
|
||||
eI = 0.4,
|
||||
eEspread=0.878e-3,
|
||||
eEpsilonX=5.63,
|
||||
eEpsilonZ=0.007,
|
||||
betaX=0.45,
|
||||
betaZ=14.4,
|
||||
)
|
||||
|
||||
sls2 = synchrotron(
|
||||
eE=2.7,
|
||||
eI=0.4,
|
||||
eEspread=1.147e-3,
|
||||
eEpsilonX=0.156,
|
||||
eEpsilonZ=0.01,
|
||||
betaX=0.18,
|
||||
betaZ=4.6,
|
||||
)
|
||||
|
||||
# Source
|
||||
bendingMagnet = namedtuple('bendingMagnet', ['name', 'center', 'sync', 'B0'])
|
||||
|
||||
sls1_29t = bendingMagnet(
|
||||
name='FE-BM-SLS1-2.9T',
|
||||
center=(0, 0, 0),
|
||||
sync=sls1,
|
||||
B0=2.9,)
|
||||
|
||||
sls2_21t = bendingMagnet(
|
||||
name='FE-BM-SLS2-2.1T',
|
||||
center=(0, 0, 0),
|
||||
sync=sls1,
|
||||
B0=2.1,)
|
||||
|
||||
# FE slits
|
||||
slits = namedtuple('slits', ['name', 'center', 'maxDivH', 'maxDivV'])
|
||||
|
||||
feSlits = slits(
|
||||
name='FE-SLITS',
|
||||
center=(0, 5290, sourceHeight),
|
||||
maxDivH=1.8e-3,
|
||||
maxDivV=0.8e-3,)
|
||||
|
||||
# Filters
|
||||
filt = namedtuple('filt', ['name', 'center', 'pitch', 'limPhysX', 'limPhysY', 'surface', 'material', 'thickness'])
|
||||
|
||||
feWindow = filt(
|
||||
name='FE-WINDOW',
|
||||
center=(0., 6158, sourceHeight),
|
||||
pitch=np.pi/2,
|
||||
limPhysX=(-6, 6),
|
||||
limPhysY=(-3., 3.),
|
||||
surface='None',
|
||||
material=filterDiamond,
|
||||
thickness=0.1,)
|
||||
feWindow = feWindow._replace(surface='CVD Diamond window {0:0.0f} $\mu$m'.format(feWindow.thickness*1e3))
|
||||
|
||||
feFilt = filt(
|
||||
name='FE-FI',
|
||||
center=(0., 6590, sourceHeight),
|
||||
pitch=np.pi/2,
|
||||
limPhysX=(-15, 15),
|
||||
limPhysY=(-10, 10),
|
||||
surface='None',
|
||||
material=filterGraphite,
|
||||
thickness=0.25,)
|
||||
feFilt = feFilt._replace(surface='Graphite filter {0:0.0f} $\mu$m'.format(feFilt.thickness*1e3))
|
||||
|
||||
# Collimating mirror
|
||||
collimatingMirror = namedtuple('collimatingMirror', ['name',
|
||||
'center', 'surface', 'material', 'limPhysX', 'limPhysY',
|
||||
'limOptX', 'limOptY', 'R', 'pitch', 'jack1', 'jack2', 'jack3',
|
||||
'tx1', 'tx2'])
|
||||
|
||||
cm = collimatingMirror(
|
||||
name='FE-CM',
|
||||
center=[0, 7618, sourceHeight],
|
||||
surface=('Rh','Si','Pt'),
|
||||
material=(stripeRh, stripeSi, stripePt),
|
||||
limPhysX=(-30, 30),
|
||||
limPhysY=(-600, 600),
|
||||
limOptX=((11, -2, -21), (21, 8, -5)),
|
||||
limOptY=((-500, -500, -500), (500, 500, 500)),
|
||||
R=[3e6, 15e6],
|
||||
pitch=[1.4e-3, 4.5e-3],
|
||||
jack1=[0., 7210., 0.], #Tripod X, Y, Z (global)
|
||||
jack2=[-210., 8310., 0.],
|
||||
jack3=[210., 8310., 0.],
|
||||
tx1=[0.0, -575.5], # X-Stage 1 [x, y] (local)
|
||||
tx2=[0.0, 575],) # X-Stage 2
|
||||
|
||||
apertures = namedtuple('apertures', ['name', 'center', 'opening'])
|
||||
|
||||
fePS = apertures(
|
||||
name='FE-PS',
|
||||
center=[0, 8760, sourceHeight],
|
||||
opening=[-39/2, 39/2, -10, 29]) # left, right, bottom, top
|
||||
|
||||
opWbBsBlock = apertures(
|
||||
name='OP-WB-BS-BLOCK',
|
||||
center=[0., 13606-135, sourceHeight],
|
||||
opening=[-18., 18., 42, 76]) # left, right, bottom, top
|
||||
|
||||
opSlits = apertures(
|
||||
name='OP-SLITS',
|
||||
center=[0, 14145-135, sourceHeight],
|
||||
opening=[-35/2, 35/2, 47.5, 82.5])
|
||||
|
||||
# Monochromator
|
||||
monochromator = namedtuple('monochromator', ['name', 'center',
|
||||
'xtal', 'material1', 'material2', 'xtalWidth', 'xtalOffsetX',
|
||||
'xtalLength1', 'xtalLength2', 'xtalGap', 'rotOffset',
|
||||
'heightOffset', 'braggLim', 'jack1', 'jack2', 'jack3', 'tx'])
|
||||
|
||||
mo1 = monochromator(
|
||||
name='OP-CCM1',
|
||||
center=[0., 11670-135, sourceHeight],
|
||||
xtal=('Si311','Si111'),
|
||||
material1=(si311_1, si111_1),
|
||||
material2=(si311_2, si111_2),
|
||||
xtalWidth = (20, 20),
|
||||
xtalOffsetX=(-19.2, 19.2),
|
||||
xtalLength1 = (60, 60),
|
||||
xtalLength2 = (60, 60),
|
||||
xtalGap = (8, 8),
|
||||
rotOffset = 6, # not sure what it is
|
||||
heightOffset = 8.5, # not sure what it is
|
||||
braggLim = [4, 35],
|
||||
jack1=[0., 11350., 0.], #Tripod not available!
|
||||
jack2=[-400., 12350., 0.],
|
||||
jack3=[400., 12350., 0.],
|
||||
tx=0.0,) # X-Stage [x]
|
||||
|
||||
# Focusing mirror
|
||||
focusingMirror = namedtuple('focusingMirror', ['name', 'center',
|
||||
'surfaceToroid', 'materialToroid',
|
||||
'limPhysXToroid', 'limPhysYToroid',
|
||||
'limOptXToroid', 'limOptYToroid',
|
||||
'R', 'pitch', 'r', 'xToroid', 'hToroid', 'jack1', 'jack2', 'jack3',
|
||||
'tx1', 'tx2'])
|
||||
|
||||
fm = focusingMirror(
|
||||
name='OP-FM',
|
||||
center=[0., 15580-135, sourceHeight],
|
||||
surfaceToroid=('Rh', 'Pt'),
|
||||
materialToroid=(stripeRh, stripePt),
|
||||
limPhysXToroid=(-54., 54.),
|
||||
limPhysYToroid=(-565., 565.),
|
||||
limOptXToroid=((90.25, 41.75), (51.75, 5.75)), # With old VME axis, no absolute value!
|
||||
limOptYToroid=((-500., -500.), (500., 500.)),
|
||||
R=[3e6, 15e6],
|
||||
pitch=[1.4e-3, 4.5e-3],
|
||||
r=[30, 20],
|
||||
xToroid=[24.126, -22,874], # offset in local x
|
||||
hToroid=[7., 11.3], # depth of the cylinder at x = xCylinder1 and x = xCylinder2.
|
||||
jack1=[0., 14980., 0.],
|
||||
jack2=[-75., 16180., 0.],
|
||||
jack3=[75., 16180., 0.],
|
||||
tx1=[0., -575.], # X-Stage 1 [x, y]
|
||||
tx2=[0., 575.],) # X-Stage 2 [x, y]
|
||||
|
||||
ehWindow = filt(
|
||||
name='EH-WINDOW',
|
||||
center=(0., 22225-135, sourceHeight),
|
||||
pitch=np.pi/2,
|
||||
limPhysX=(-10., 10.),
|
||||
limPhysY=(17.5, 92.5),
|
||||
surface='None',
|
||||
material=filterBe,
|
||||
thickness=0.25,)
|
||||
ehWindow = ehWindow._replace(surface='Beryllium window {0:0.0f} $\mu$m'.format(ehWindow.thickness*1e3))
|
||||
|
||||
# Sample
|
||||
sample = namedtuple('sample', ['name', 'center'])
|
||||
|
||||
smpl = sample(
|
||||
name='OP-SMPL',
|
||||
center=[0, 24000-135, sourceHeight],)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
|
||||
#######################################
|
||||
## Beam Monitors 2 and 3 -- Virtual positioners
|
||||
|
||||
|
||||
@@ -0,0 +1,243 @@
|
||||
|
||||
###################################
|
||||
## Frontend Absorber ##
|
||||
###################################
|
||||
|
||||
abs:
|
||||
readoutPriority: baseline
|
||||
description: Frontend Absorber
|
||||
deviceClass: superxas_bec.devices.absorber.Absorber
|
||||
deviceConfig:
|
||||
prefix: "X10DA-FE-ABS1:"
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
softwareTrigger: false
|
||||
|
||||
###################################
|
||||
## Frontend Slits ##
|
||||
###################################
|
||||
|
||||
sldi_trxr:
|
||||
readoutPriority: baseline
|
||||
description: Front-end slit diaphragm X-translation Ring-edge
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig:
|
||||
prefix: X10DA-FE-SLDI:TRXR
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
softwareTrigger: false
|
||||
|
||||
sldi_trxw:
|
||||
readoutPriority: baseline
|
||||
description: Front-end slit diaphragm X-translation Wall-edge
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig:
|
||||
prefix: X10DA-FE-SLDI:TRXW
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
softwareTrigger: false
|
||||
|
||||
sldi_tryb:
|
||||
readoutPriority: baseline
|
||||
description: Front-end slit diaphragm Y-translation Bottom-edge
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig:
|
||||
prefix: X10DA-FE-SLDI:TRYB
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
softwareTrigger: false
|
||||
|
||||
sldi_tryt:
|
||||
readoutPriority: baseline
|
||||
description: Front-end slit diaphragm X-translation Top-edge
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig:
|
||||
prefix: X10DA-FE-SLDI:TRYT
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
softwareTrigger: false
|
||||
|
||||
sldi_centerx:
|
||||
readoutPriority: baseline
|
||||
description: Front-end slit diaphragm X-center
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig:
|
||||
prefix: X10DA-FE-SLDI:CENTERX
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
softwareTrigger: false
|
||||
|
||||
sldi_gapx:
|
||||
readoutPriority: baseline
|
||||
description: Front-end slit diaphragm X-gap
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig:
|
||||
prefix: X10DA-FE-SLDI:GAPX
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
softwareTrigger: false
|
||||
|
||||
sldi_centery:
|
||||
readoutPriority: baseline
|
||||
description: Front-end slit diaphragm Y-center
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig:
|
||||
prefix: X10DA-FE-SLDI:CENTERY
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
softwareTrigger: false
|
||||
|
||||
sldi_gapy:
|
||||
readoutPriority: baseline
|
||||
description: Front-end slit diaphragm Y-gap
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig:
|
||||
prefix: X10DA-FE-SLDI:GAPY
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
softwareTrigger: false
|
||||
|
||||
###################################
|
||||
## Collimating Mirror ##
|
||||
###################################
|
||||
|
||||
cm_trxu:
|
||||
readoutPriority: baseline
|
||||
description: Collimating Mirror X-translation upstream
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig:
|
||||
prefix: X10DA-FE-CM:TRXU
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
softwareTrigger: false
|
||||
|
||||
cm_trxd:
|
||||
readoutPriority: baseline
|
||||
description: Collimating Mirror X-translation downstream
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig:
|
||||
prefix: X10DA-FE-CM:TRXD
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
softwareTrigger: false
|
||||
|
||||
cm_tryu:
|
||||
readoutPriority: baseline
|
||||
description: Collimating Mirror Y-translation upstream
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig:
|
||||
prefix: X10DA-FE-CM:TRYU
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
softwareTrigger: false
|
||||
|
||||
cm_trydr:
|
||||
readoutPriority: baseline
|
||||
description: Collimating Mirror Y-translation downstream ring
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig:
|
||||
prefix: X10DA-FE-CM:TRYDR
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
softwareTrigger: false
|
||||
|
||||
cm_trydw:
|
||||
readoutPriority: baseline
|
||||
description: Collimating Mirror Y-translation downstream wall
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig:
|
||||
prefix: X10DA-FE-CM:TRYDW
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
softwareTrigger: false
|
||||
|
||||
cm_bnd:
|
||||
readoutPriority: baseline
|
||||
description: Collimating Mirror bender
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig:
|
||||
prefix: X10DA-FE-CM:BND
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
softwareTrigger: false
|
||||
|
||||
# cm_bnd_radius:
|
||||
# readoutPriority: baseline
|
||||
# description: Collimating Mirror Bending Radius
|
||||
# deviceClass: ophyd.EpicsSignalRO
|
||||
# deviceConfig:
|
||||
# read_pv: X10DA-CPCL-CM:BNDFORCE
|
||||
# onFailure: retry
|
||||
# readOnly: true
|
||||
# enabled: true
|
||||
# softwareTrigger: false
|
||||
|
||||
cm_rotx:
|
||||
readoutPriority: baseline
|
||||
description: Collimating Morror Pitch
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig:
|
||||
prefix: X10DA-FE-CM:ROTX
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
softwareTrigger: false
|
||||
|
||||
cm_roty:
|
||||
readoutPriority: baseline
|
||||
description: Collimating Morror Yaw
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig:
|
||||
prefix: X10DA-FE-CM:ROTY
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
softwareTrigger: false
|
||||
|
||||
cm_rotz:
|
||||
readoutPriority: baseline
|
||||
description: Collimating Morror Roll
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig:
|
||||
prefix: X10DA-FE-CM:ROTZ
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
softwareTrigger: false
|
||||
|
||||
cm_trx:
|
||||
readoutPriority: baseline
|
||||
description: Collimating Morror Center Point X
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig:
|
||||
prefix: X10DA-FE-CM:XTCP
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
softwareTrigger: false
|
||||
|
||||
cm_try:
|
||||
readoutPriority: baseline
|
||||
description: Collimating Morror Center Point Y
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig:
|
||||
prefix: X10DA-FE-CM:YTCP
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
softwareTrigger: false
|
||||
|
||||
cm_ztcp:
|
||||
readoutPriority: baseline
|
||||
description: Collimating Morror Center Point Z
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig:
|
||||
prefix: X10DA-FE-CM:ZTCP
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
softwareTrigger: false
|
||||
|
||||
cm_xstripe:
|
||||
readoutPriority: baseline
|
||||
description: Collimating Morror X Stripe
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig:
|
||||
prefix: X10DA-FE-CM:XSTRIPE
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
softwareTrigger: false
|
||||
@@ -0,0 +1,18 @@
|
||||
|
||||
###################################
|
||||
## SLS Machine ##
|
||||
###################################
|
||||
|
||||
curr:
|
||||
readoutPriority: baseline
|
||||
description: SLS ring current
|
||||
deviceClass: ophyd.EpicsSignalRO
|
||||
deviceConfig:
|
||||
auto_monitor: true
|
||||
read_pv: AGEBD-DBPM3CURR:CURRENT-AVG
|
||||
deviceTags:
|
||||
- machine
|
||||
onFailure: buffer
|
||||
enabled: true
|
||||
readOnly: true
|
||||
softwareTrigger: false
|
||||
@@ -0,0 +1,396 @@
|
||||
|
||||
###################################
|
||||
## Monochromator ##
|
||||
###################################
|
||||
|
||||
mo1_try:
|
||||
readoutPriority: baseline
|
||||
description: Monochromator Y Translation
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig:
|
||||
prefix: X10DA-OP1-MO1:TRY
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
softwareTrigger: false
|
||||
|
||||
mo1_trx:
|
||||
readoutPriority: baseline
|
||||
description: Monochromator X Translation
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig:
|
||||
prefix: X10DA-OP1-MO1:TRX
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
softwareTrigger: false
|
||||
|
||||
###################################
|
||||
## Optics Slits + Beam Monitor 1 ##
|
||||
###################################
|
||||
|
||||
# sl1_trxr:
|
||||
# readoutPriority: baseline
|
||||
# description: Optics slits 1 X-translation Ring-edge
|
||||
# deviceClass: ophyd.EpicsMotor
|
||||
# deviceConfig:
|
||||
# prefix: X10DA-OP-SL1:TRXR
|
||||
# onFailure: retry
|
||||
# enabled: true
|
||||
# softwareTrigger: false
|
||||
# deviceTags:
|
||||
# - optics
|
||||
# - slits
|
||||
|
||||
# sl1_trxw:
|
||||
# readoutPriority: baseline
|
||||
# description: Optics slits 1 X-translation Wall-edge
|
||||
# deviceClass: ophyd.EpicsMotor
|
||||
# deviceConfig:
|
||||
# prefix: X10DA-OP-SL1:TRXW
|
||||
# onFailure: retry
|
||||
# enabled: true
|
||||
# softwareTrigger: false
|
||||
# deviceTags:
|
||||
# - optics
|
||||
# - slits
|
||||
|
||||
# sl1_tryb:
|
||||
# readoutPriority: baseline
|
||||
# description: Optics slits 1 Y-translation Bottom-edge
|
||||
# deviceClass: ophyd.EpicsMotor
|
||||
# deviceConfig:
|
||||
# prefix: X10DA-OP-SL1:TRYB
|
||||
# onFailure: retry
|
||||
# enabled: true
|
||||
# softwareTrigger: false
|
||||
# deviceTags:
|
||||
# - optics
|
||||
# - slits
|
||||
|
||||
# sl1_tryt:
|
||||
# readoutPriority: baseline
|
||||
# description: Optics slits 1 X-translation Top-edge
|
||||
# deviceClass: ophyd.EpicsMotor
|
||||
# deviceConfig:
|
||||
# prefix: X10DA-OP-SL1:TRYT
|
||||
# onFailure: retry
|
||||
# enabled: true
|
||||
# softwareTrigger: false
|
||||
# deviceTags:
|
||||
# - optics
|
||||
# - slits
|
||||
|
||||
# bm1_try:
|
||||
# readoutPriority: baseline
|
||||
# description: Beam Monitor 1 Y-translation
|
||||
# deviceClass: ophyd.EpicsMotor
|
||||
# deviceConfig:
|
||||
# prefix: X10DA-OP-BM1:TRY
|
||||
# onFailure: retry
|
||||
# enabled: true
|
||||
# softwareTrigger: false
|
||||
# deviceTags:
|
||||
# - optics
|
||||
# - slits
|
||||
|
||||
# sl1_centerx:
|
||||
# readoutPriority: baseline
|
||||
# description: Optics slits 1 X-center
|
||||
# deviceClass: ophyd.EpicsMotor
|
||||
# deviceConfig:
|
||||
# prefix: X10DA-OP-SL1:CENTERX
|
||||
# onFailure: retry
|
||||
# enabled: true
|
||||
# softwareTrigger: false
|
||||
# deviceTags:
|
||||
# - optics
|
||||
# - slits
|
||||
|
||||
# sl1_gapx:
|
||||
# readoutPriority: baseline
|
||||
# description: Optics slits 1 X-gap
|
||||
# deviceClass: ophyd.EpicsMotor
|
||||
# deviceConfig:
|
||||
# prefix: X10DA-OP-SL1:GAPX
|
||||
# onFailure: retry
|
||||
# enabled: true
|
||||
# softwareTrigger: false
|
||||
# deviceTags:
|
||||
# - optics
|
||||
# - slits
|
||||
|
||||
# sl1_centery:
|
||||
# readoutPriority: baseline
|
||||
# description: Optics slits 1 Y-center
|
||||
# deviceClass: ophyd.EpicsMotor
|
||||
# deviceConfig:
|
||||
# prefix: X10DA-OP-SL1:CENTERY
|
||||
# onFailure: retry
|
||||
# enabled: true
|
||||
# softwareTrigger: false
|
||||
# deviceTags:
|
||||
# - optics
|
||||
# - slits
|
||||
|
||||
# sl1_gapy:
|
||||
# readoutPriority: baseline
|
||||
# description: Optics slits 1 Y-gap
|
||||
# deviceClass: ophyd.EpicsMotor
|
||||
# deviceConfig:
|
||||
# prefix: X10DA-OP-SL1:GAPY
|
||||
# onFailure: retry
|
||||
# enabled: true
|
||||
# softwareTrigger: false
|
||||
# deviceTags:
|
||||
# - optics
|
||||
# - slits
|
||||
|
||||
###################################
|
||||
## Focusing Mirror ##
|
||||
###################################
|
||||
|
||||
# fm_trxu:
|
||||
# readoutPriority: baseline
|
||||
# description: Focusing Mirror X-translation upstream
|
||||
# deviceClass: ophyd.EpicsMotor
|
||||
# deviceConfig:
|
||||
# prefix: X10DA-OP-FM:TRXU
|
||||
# onFailure: retry
|
||||
# enabled: true
|
||||
# softwareTrigger: false
|
||||
|
||||
# fm_trxd:
|
||||
# readoutPriority: baseline
|
||||
# description: Focusing Mirror X-translation downstream
|
||||
# deviceClass: ophyd.EpicsMotor
|
||||
# deviceConfig:
|
||||
# prefix: X10DA-OP-FM:TRXD
|
||||
# onFailure: retry
|
||||
# enabled: true
|
||||
# softwareTrigger: false
|
||||
|
||||
# fm_tryd:
|
||||
# readoutPriority: baseline
|
||||
# description: Focusing Mirror Y-translation downstream
|
||||
# deviceClass: ophyd.EpicsMotor
|
||||
# deviceConfig:
|
||||
# prefix: X10DA-OP-FM:TRYD
|
||||
# onFailure: retry
|
||||
# enabled: true
|
||||
# softwareTrigger: false
|
||||
|
||||
# fm_tryur:
|
||||
# readoutPriority: baseline
|
||||
# description: Focusing Mirror Y-translation upstream ring
|
||||
# deviceClass: ophyd.EpicsMotor
|
||||
# deviceConfig:
|
||||
# prefix: X10DA-OP-FM:TRYUR
|
||||
# onFailure: retry
|
||||
# enabled: true
|
||||
# softwareTrigger: false
|
||||
|
||||
# fm_tryuw:
|
||||
# readoutPriority: baseline
|
||||
# description: Focusing Mirror Y-translation upstream wall
|
||||
# deviceClass: ophyd.EpicsMotor
|
||||
# deviceConfig:
|
||||
# prefix: X10DA-OP-FM:TRYUW
|
||||
# onFailure: retry
|
||||
# enabled: true
|
||||
# softwareTrigger: false
|
||||
|
||||
fm_bnd:
|
||||
readoutPriority: baseline
|
||||
description: Focusing Mirror bender
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig:
|
||||
prefix: X10DA-OP-MI1:TRB
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
softwareTrigger: false
|
||||
|
||||
# fm_bnd_radius:
|
||||
# readoutPriority: baseline
|
||||
# description: Focusing Mirror Bending Radius
|
||||
# deviceClass: ophyd.EpicsSignalRO
|
||||
# deviceConfig:
|
||||
# read_pv: X10DA-CPCL-FM:BNDFORCE
|
||||
# onFailure: retry
|
||||
# readOnly: true
|
||||
# enabled: true
|
||||
# softwareTrigger: false
|
||||
|
||||
fm_rotx:
|
||||
readoutPriority: baseline
|
||||
description: Focusing Morror Pitch
|
||||
deviceClass: ophyd.EpicsSignalRO
|
||||
deviceConfig:
|
||||
auto_monitor: true
|
||||
read_pv: X10DA-OP-MI1:pitch
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
softwareTrigger: false
|
||||
|
||||
fm_roty:
|
||||
readoutPriority: baseline
|
||||
description: Focusing Morror Yaw
|
||||
deviceClass: ophyd.EpicsSignalRO
|
||||
deviceConfig:
|
||||
auto_monitor: true
|
||||
read_pv: X10DA-OP-MI1:yaw
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
softwareTrigger: false
|
||||
|
||||
fm_rotz:
|
||||
readoutPriority: baseline
|
||||
description: Focusing Morror Roll
|
||||
deviceClass: ophyd.EpicsSignalRO
|
||||
deviceConfig:
|
||||
auto_monitor: true
|
||||
read_pv: X10DA-OP-MI1:roll
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
softwareTrigger: false
|
||||
|
||||
fm_trx:
|
||||
readoutPriority: baseline
|
||||
description: Focusing Morror Center Point X
|
||||
deviceClass: ophyd.EpicsSignalRO
|
||||
deviceConfig:
|
||||
auto_monitor: true
|
||||
read_pv: X10DA-OP-MI1:trans
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
softwareTrigger: false
|
||||
|
||||
fm_try:
|
||||
readoutPriority: baseline
|
||||
description: Focusing Morror Center Point Y
|
||||
deviceClass: ophyd.EpicsSignalRO
|
||||
deviceConfig:
|
||||
auto_monitor: true
|
||||
read_pv: X10DA-OP-MI1:y
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
softwareTrigger: false
|
||||
|
||||
###################################
|
||||
## Optics Slits + Beam Monitor 2 ##
|
||||
###################################
|
||||
|
||||
# sl2_trxr:
|
||||
# readoutPriority: baseline
|
||||
# description: Optics slits 2 X-translation Ring-edge
|
||||
# deviceClass: ophyd.EpicsMotor
|
||||
# deviceConfig:
|
||||
# prefix: X10DA-OP-SL2:TRXR
|
||||
# onFailure: retry
|
||||
# enabled: true
|
||||
# softwareTrigger: false
|
||||
# deviceTags:
|
||||
# - optics
|
||||
# - slits
|
||||
|
||||
# sl2_trxw:
|
||||
# readoutPriority: baseline
|
||||
# description: Optics slits 2 X-translation Wall-edge
|
||||
# deviceClass: ophyd.EpicsMotor
|
||||
# deviceConfig:
|
||||
# prefix: X10DA-OP-SL2:TRXW
|
||||
# onFailure: retry
|
||||
# enabled: true
|
||||
# softwareTrigger: false
|
||||
# deviceTags:
|
||||
# - optics
|
||||
# - slits
|
||||
|
||||
# sl2_tryb:
|
||||
# readoutPriority: baseline
|
||||
# description: Optics slits 2 Y-translation Bottom-edge
|
||||
# deviceClass: ophyd.EpicsMotor
|
||||
# deviceConfig:
|
||||
# prefix: X10DA-OP-SL2:TRYB
|
||||
# onFailure: retry
|
||||
# enabled: true
|
||||
# softwareTrigger: false
|
||||
# deviceTags:
|
||||
# - optics
|
||||
# - slits
|
||||
|
||||
# sl2_tryt:
|
||||
# readoutPriority: baseline
|
||||
# description: Optics slits 2 X-translation Top-edge
|
||||
# deviceClass: ophyd.EpicsMotor
|
||||
# deviceConfig:
|
||||
# prefix: X10DA-OP-SL2:TRYT
|
||||
# onFailure: retry
|
||||
# enabled: true
|
||||
# softwareTrigger: false
|
||||
# deviceTags:
|
||||
# - optics
|
||||
# - slits
|
||||
|
||||
# bm2_try:
|
||||
# readoutPriority: baseline
|
||||
# description: Beam Monitor 2 Y-translation
|
||||
# deviceClass: ophyd.EpicsMotor
|
||||
# deviceConfig:
|
||||
# prefix: X10DA-OP-BM2:TRY
|
||||
# onFailure: retry
|
||||
# enabled: true
|
||||
# softwareTrigger: false
|
||||
# deviceTags:
|
||||
# - optics
|
||||
# - slits
|
||||
|
||||
# sl2_centerx:
|
||||
# readoutPriority: baseline
|
||||
# description: Optics slits 2 X-center
|
||||
# deviceClass: ophyd.EpicsMotor
|
||||
# deviceConfig:
|
||||
# prefix: X10DA-OP-SL2:CENTERX
|
||||
# onFailure: retry
|
||||
# enabled: true
|
||||
# softwareTrigger: false
|
||||
# deviceTags:
|
||||
# - optics
|
||||
# - slits
|
||||
|
||||
# sl2_gapx:
|
||||
# readoutPriority: baseline
|
||||
# description: Optics slits 2 X-gap
|
||||
# deviceClass: ophyd.EpicsMotor
|
||||
# deviceConfig:
|
||||
# prefix: X10DA-OP-SL2:GAPX
|
||||
# onFailure: retry
|
||||
# enabled: true
|
||||
# softwareTrigger: false
|
||||
# deviceTags:
|
||||
# - optics
|
||||
# - slits
|
||||
|
||||
# sl2_centery:
|
||||
# readoutPriority: baseline
|
||||
# description: Optics slits 2 Y-center
|
||||
# deviceClass: ophyd.EpicsMotor
|
||||
# deviceConfig:
|
||||
# prefix: X10DA-OP-SL2:CENTERY
|
||||
# onFailure: retry
|
||||
# enabled: true
|
||||
# softwareTrigger: false
|
||||
# deviceTags:
|
||||
# - optics
|
||||
# - slits
|
||||
|
||||
# sl2_gapy:
|
||||
# readoutPriority: baseline
|
||||
# description: Optics slits 2 Y-gap
|
||||
# deviceClass: ophyd.EpicsMotor
|
||||
# deviceConfig:
|
||||
# prefix: X10DA-OP-SL2:GAPY
|
||||
# onFailure: retry
|
||||
# enabled: true
|
||||
# softwareTrigger: false
|
||||
# deviceTags:
|
||||
# - optics
|
||||
# - slits
|
||||
@@ -0,0 +1,89 @@
|
||||
|
||||
###################################
|
||||
## General ##
|
||||
###################################
|
||||
|
||||
## SLS Machine
|
||||
machine_config:
|
||||
- !include ./x10da_machine.yaml
|
||||
|
||||
## Beam Monitors OP + EH
|
||||
# beam_monitors_config:
|
||||
# - !include ./x10da_beam_monitors.yaml
|
||||
|
||||
###################################
|
||||
## Frontend ##
|
||||
###################################
|
||||
|
||||
## Frontend
|
||||
frontend_config:
|
||||
- !include ./x10da_frontend.yaml
|
||||
|
||||
###################################
|
||||
## Optics Hutch ##
|
||||
###################################
|
||||
|
||||
## Bragg Monochromator
|
||||
mo1_gonio:
|
||||
readoutPriority: baseline
|
||||
description: Monochromator ROTX Goniometer
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig:
|
||||
prefix: X10DA-OP1-MO1:ROTX
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
softwareTrigger: false
|
||||
mo1_bragg:
|
||||
readoutPriority: baseline
|
||||
description: Positioner for the Monochromator
|
||||
deviceClass: superxas_bec.devices.mo1_bragg.mo1_bragg.Mo1Bragg
|
||||
deviceConfig:
|
||||
prefix: "X10DA-OP-MO1:BRAGG:"
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
softwareTrigger: false
|
||||
# mo1_bragg_angle:
|
||||
# readoutPriority: baseline
|
||||
# description: Positioner for the Monochromator
|
||||
# deviceClass: superxas_bec.devices.mo1_bragg.mo1_bragg_angle.Mo1BraggAngle
|
||||
# deviceConfig:
|
||||
# prefix: "X10DA-OP-MO1:BRAGG:"
|
||||
# onFailure: retry
|
||||
# enabled: true
|
||||
# softwareTrigger: false
|
||||
|
||||
# Remaining optics hutch
|
||||
optics_config:
|
||||
- !include ./x10da_optics.yaml
|
||||
|
||||
###################################
|
||||
## Experimental Hutch ##
|
||||
###################################
|
||||
|
||||
# ## NIDAQ
|
||||
nidaq:
|
||||
readoutPriority: monitored
|
||||
description: NIDAQ backend for data reading for superxas scans
|
||||
deviceClass: superxas_bec.devices.nidaq.nidaq.Nidaq
|
||||
deviceConfig:
|
||||
prefix: "X10DA-CPCL-SCANSERVER:"
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
softwareTrigger: false
|
||||
|
||||
## XAS (ICx, SDD, ref foils)
|
||||
# xas_config:
|
||||
# - !include ./x10da_xas.yaml
|
||||
|
||||
## XRD (Pilatus, pinhole, beamstop)
|
||||
#xrd_config:
|
||||
# - !include ./x10da_xrd.yaml
|
||||
|
||||
# Commented out because too slow
|
||||
## Hutch cameras
|
||||
# hutch_cams:
|
||||
# - !include ./x10da_hutch_cameras.yaml
|
||||
|
||||
## Remaining experimental hutch
|
||||
# es_config:
|
||||
# - !include ./x10da_experimental_hutch.yaml
|
||||
@@ -0,0 +1,72 @@
|
||||
"""Frontend Absorber"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import enum
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from ophyd import Component as Cpt
|
||||
from ophyd import EpicsSignal, EpicsSignalRO
|
||||
from ophyd_devices import CompareStatus, DeviceStatus
|
||||
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from bec_lib.devicemanager import ScanInfo
|
||||
|
||||
class AbsorberError(Exception):
|
||||
"""Absorber specific exception"""
|
||||
|
||||
class STATUS(int, enum.Enum):
|
||||
"""Absorber States"""
|
||||
|
||||
MOVING_CLOSE = 0
|
||||
OPEN = 1
|
||||
MOVING_OPEN = 2
|
||||
CLOSED = 3
|
||||
NOT_ENABLED = 4
|
||||
TIMEOUT_CLOSE = 5
|
||||
TIMEOUT_OPEN = 6
|
||||
CLOSE_LS_LOST = 7
|
||||
OPEN_LS_LOST = 8
|
||||
CLOSE_LS_NOT_FREE = 9
|
||||
OPEN_LS_NOT_FREE = 10
|
||||
ERROR_LS = 11
|
||||
TO_CONNECT = 12
|
||||
MAN_OPEN = 13
|
||||
UNDEFINED = 14
|
||||
|
||||
class Absorber(PSIDeviceBase):
|
||||
"""Class for the Frontend Absorber"""
|
||||
|
||||
USER_ACCESS = ["open", "close"]
|
||||
|
||||
request = Cpt(EpicsSignal, suffix="REQUEST", kind="config", doc="Open/Close Absorber")
|
||||
status = Cpt(EpicsSignalRO, suffix="STATUS", kind="config", doc="Absorber Status")
|
||||
status_string = Cpt(EpicsSignalRO, suffix="STATUS", kind="config", string=True, doc="Absorber Status")
|
||||
|
||||
def __init__(self, *, name: str, prefix: str = "", scan_info: ScanInfo | None = None, **kwargs):
|
||||
super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs)
|
||||
|
||||
self.timeout_for_move = 10
|
||||
# Wait for connection on all components, ensure IOC is connected
|
||||
self.wait_for_connection(all_signals=True, timeout=5)
|
||||
|
||||
def open(self) -> DeviceStatus | None:
|
||||
"""Open the Absorber"""
|
||||
if self.status.get() == STATUS.CLOSED:
|
||||
self.request.put(1)
|
||||
status_open = CompareStatus(self.status, STATUS.OPEN, timeout=self.timeout_for_move)
|
||||
status = status_open
|
||||
return status
|
||||
else:
|
||||
return None
|
||||
|
||||
def close(self) -> DeviceStatus | None:
|
||||
"""Close the Absorber"""
|
||||
if self.status.get() == STATUS.OPEN:
|
||||
self.request.put(1)
|
||||
status_close = CompareStatus(self.status, STATUS.CLOSED, timeout=self.timeout_for_move)
|
||||
status = status_close
|
||||
return status
|
||||
else:
|
||||
return None
|
||||
@@ -0,0 +1,466 @@
|
||||
"""Module for the Mo1 Bragg positioner of the SuperXAS beamline.
|
||||
The softIOC is reachable via the EPICS prefix X10DA-OP-MO1:BRAGG: and connected
|
||||
to a motor controller via web sockets. The Mo1 Bragg positioner is a scan controller
|
||||
to setup XAS scans. A few scan modes are programmed in the controller, e.g. simple and advanced XAS scans.
|
||||
|
||||
Note: For some of the Epics PVs, in particular action buttons, the put_complete=True is
|
||||
used to ensure that the action is executed completely. This is believed
|
||||
to allow for a more stable execution of the action."""
|
||||
|
||||
import time
|
||||
from typing import Any, Literal
|
||||
|
||||
from bec_lib.devicemanager import ScanInfo
|
||||
from bec_lib.logger import bec_logger
|
||||
from ophyd import Component as Cpt
|
||||
from ophyd import DeviceStatus, Signal, StatusBase
|
||||
from ophyd.status import SubscriptionStatus, WaitTimeoutError
|
||||
from ophyd_devices import CompareStatus, ProgressSignal, TransitionStatus
|
||||
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
|
||||
from ophyd_devices.utils.errors import DeviceStopError
|
||||
from pydantic import BaseModel, Field
|
||||
from typeguard import typechecked
|
||||
|
||||
from superxas_bec.devices.mo1_bragg.mo1_bragg_devices import Mo1BraggPositioner
|
||||
|
||||
# pylint: disable=unused-import
|
||||
from superxas_bec.devices.mo1_bragg.mo1_bragg_enums import (
|
||||
MoveType,
|
||||
ScanControlLoadMessage,
|
||||
ScanControlMode,
|
||||
ScanControlScanStatus,
|
||||
TriggerControlMode,
|
||||
TriggerControlSource,
|
||||
)
|
||||
from superxas_bec.devices.mo1_bragg.mo1_bragg_utils import compute_spline
|
||||
|
||||
# Initialise logger
|
||||
logger = bec_logger.logger
|
||||
|
||||
########### Exceptions ###########
|
||||
|
||||
|
||||
class Mo1BraggError(Exception):
|
||||
"""Exception for the Mo1 Bragg positioner"""
|
||||
|
||||
|
||||
########## Scan Parameter Model ##########
|
||||
|
||||
|
||||
class ScanParameter(BaseModel):
|
||||
"""Dataclass to store the scan parameters for the Mo1 Bragg positioner.
|
||||
This needs to be in sync with the kwargs of the MO1 Bragg scans from SuperXAS, to
|
||||
ensure that the scan parameters are correctly set. Any changes in the scan kwargs,
|
||||
i.e. renaming or adding new parameters, need to be represented here as well."""
|
||||
|
||||
scan_time: float | None = Field(None, description="Scan time for a half oscillation")
|
||||
scan_duration: float | None = Field(None, description="Duration of the scan")
|
||||
xrd_enable_low: bool | None = Field(
|
||||
None, description="XRD enabled for low, should be PV trig_ena_lo_enum"
|
||||
) # trig_enable_low: bool = None
|
||||
xrd_enable_high: bool | None = Field(
|
||||
None, description="XRD enabled for high, should be PV trig_ena_hi_enum"
|
||||
) # trig_enable_high: bool = None
|
||||
exp_time_low: float | None = Field(None, description="Exposure time low energy/angle")
|
||||
exp_time_high: float | None = Field(None, description="Exposure time high energy/angle")
|
||||
cycle_low: int | None = Field(None, description="Cycle for low energy/angle")
|
||||
cycle_high: int | None = Field(None, description="Cycle for high energy/angle")
|
||||
start: float | None = Field(None, description="Start value for energy/angle")
|
||||
stop: float | None = Field(None, description="Stop value for energy/angle")
|
||||
p_kink: float | None = Field(None, description="P Kink")
|
||||
e_kink: float | None = Field(None, description="Energy Kink")
|
||||
model_config: dict = {"validate_assignment": True}
|
||||
|
||||
|
||||
########### Mo1 Bragg Motor Class ###########
|
||||
|
||||
|
||||
class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
|
||||
"""Mo1 Bragg motor for the SuperXAS beamline.
|
||||
|
||||
The prefix to connect to the soft IOC is X10DA-OP-MO1:BRAGG:
|
||||
"""
|
||||
|
||||
progress_signal = Cpt(ProgressSignal, name="progress_signal")
|
||||
|
||||
USER_ACCESS = ["set_advanced_xas_settings", "set_xtal"]
|
||||
|
||||
def __init__(self, name: str, prefix: str = "", scan_info: ScanInfo | None = None, **kwargs): # type: ignore
|
||||
"""
|
||||
Initialize the PSI Device Base class.
|
||||
|
||||
Args:
|
||||
name (str) : Name of the device
|
||||
scan_info (ScanInfo): The scan info to use.
|
||||
"""
|
||||
super().__init__(name=name, scan_info=scan_info, prefix=prefix, **kwargs)
|
||||
self.scan_parameter = ScanParameter()
|
||||
self.timeout_for_pvwait = 7.5
|
||||
|
||||
########################################
|
||||
# Beamline Specific Implementations #
|
||||
########################################
|
||||
|
||||
def on_init(self) -> None:
|
||||
"""
|
||||
Called when the device is initialized.
|
||||
|
||||
No signals are connected at this point. If you like to
|
||||
set default values on signals, please use on_connected instead.
|
||||
"""
|
||||
|
||||
def on_connected(self) -> None:
|
||||
"""
|
||||
Called after the device is connected and its signals are connected.
|
||||
Default values for signals should be set here.
|
||||
"""
|
||||
self.scan_control.scan_progress.subscribe(self._progress_update, run=False)
|
||||
|
||||
def on_stage(self) -> DeviceStatus | StatusBase | None:
|
||||
"""
|
||||
Called while staging the device.
|
||||
|
||||
Information about the upcoming scan can be accessed from the scan_info (self.scan_info.msg) object.
|
||||
"""
|
||||
|
||||
if self.scan_control.scan_msg.get() != ScanControlLoadMessage.PENDING:
|
||||
status = CompareStatus(self.scan_control.scan_msg, ScanControlLoadMessage.PENDING)
|
||||
self.cancel_on_stop(status)
|
||||
self.scan_control.scan_val_reset.put(1)
|
||||
status.wait(timeout=self.timeout_for_pvwait)
|
||||
|
||||
scan_name = self.scan_info.msg.scan_name
|
||||
self._update_scan_parameter()
|
||||
|
||||
if scan_name == "xas_simple_scan":
|
||||
self.set_xas_settings(
|
||||
low=self.scan_parameter.start,
|
||||
high=self.scan_parameter.stop,
|
||||
scan_time=self.scan_parameter.scan_time,
|
||||
)
|
||||
self.set_trig_settings(
|
||||
enable_low=False,
|
||||
enable_high=False,
|
||||
exp_time_low=0,
|
||||
exp_time_high=0,
|
||||
cycle_low=0,
|
||||
cycle_high=0,
|
||||
)
|
||||
self.set_scan_control_settings(
|
||||
mode=ScanControlMode.SIMPLE, scan_duration=self.scan_parameter.scan_duration
|
||||
)
|
||||
elif scan_name == "xas_simple_scan_with_xrd":
|
||||
self.set_xas_settings(
|
||||
low=self.scan_parameter.start,
|
||||
high=self.scan_parameter.stop,
|
||||
scan_time=self.scan_parameter.scan_time,
|
||||
)
|
||||
self.set_trig_settings(
|
||||
enable_low=self.scan_parameter.xrd_enable_low, # enable_low=self.scan_parameter.trig_enable_low,
|
||||
enable_high=self.scan_parameter.xrd_enable_high, # enable_high=self.scan_parameter.trig_enable_high,
|
||||
exp_time_low=self.scan_parameter.exp_time_low,
|
||||
exp_time_high=self.scan_parameter.exp_time_high,
|
||||
cycle_low=self.scan_parameter.cycle_low,
|
||||
cycle_high=self.scan_parameter.cycle_high,
|
||||
)
|
||||
self.set_scan_control_settings(
|
||||
mode=ScanControlMode.SIMPLE, scan_duration=self.scan_parameter.scan_duration
|
||||
)
|
||||
elif scan_name == "xas_advanced_scan":
|
||||
self.set_advanced_xas_settings(
|
||||
low=self.scan_parameter.start,
|
||||
high=self.scan_parameter.stop,
|
||||
scan_time=self.scan_parameter.scan_time,
|
||||
p_kink=self.scan_parameter.p_kink,
|
||||
e_kink=self.scan_parameter.e_kink,
|
||||
)
|
||||
self.set_trig_settings(
|
||||
enable_low=False,
|
||||
enable_high=False,
|
||||
exp_time_low=0,
|
||||
exp_time_high=0,
|
||||
cycle_low=0,
|
||||
cycle_high=0,
|
||||
)
|
||||
self.set_scan_control_settings(
|
||||
mode=ScanControlMode.ADVANCED, scan_duration=self.scan_parameter.scan_duration
|
||||
)
|
||||
elif scan_name == "xas_advanced_scan_with_xrd":
|
||||
self.set_advanced_xas_settings(
|
||||
low=self.scan_parameter.start,
|
||||
high=self.scan_parameter.stop,
|
||||
scan_time=self.scan_parameter.scan_time,
|
||||
p_kink=self.scan_parameter.p_kink,
|
||||
e_kink=self.scan_parameter.e_kink,
|
||||
)
|
||||
self.set_trig_settings(
|
||||
enable_low=self.scan_parameter.xrd_enable_low, # enable_low=self.scan_parameter.trig_enable_low,
|
||||
enable_high=self.scan_parameter.xrd_enable_high, # enable_high=self.scan_parameter.trig_enable_high,
|
||||
exp_time_low=self.scan_parameter.exp_time_low,
|
||||
exp_time_high=self.scan_parameter.exp_time_high,
|
||||
cycle_low=self.scan_parameter.cycle_low,
|
||||
cycle_high=self.scan_parameter.cycle_high,
|
||||
)
|
||||
self.set_scan_control_settings(
|
||||
mode=ScanControlMode.ADVANCED, scan_duration=self.scan_parameter.scan_duration
|
||||
)
|
||||
else:
|
||||
return
|
||||
# Setting scan duration seems to lag behind slightly in the backend, include small sleep
|
||||
# logger.info(f"Sleeping for one second")
|
||||
# time.sleep(1)
|
||||
# logger.info(f"Device {self.name}, done sleeping")
|
||||
|
||||
# Load the scan parameters to the controller
|
||||
status = CompareStatus(self.scan_control.scan_msg, ScanControlLoadMessage.SUCCESS)
|
||||
self.cancel_on_stop(status)
|
||||
self.scan_control.scan_load.put(1)
|
||||
# Wait for params to be checked from controller
|
||||
status.wait(self.timeout_for_pvwait)
|
||||
return None
|
||||
|
||||
def on_unstage(self) -> DeviceStatus | StatusBase | None:
|
||||
"""Called while unstaging the device."""
|
||||
if self.stopped is True:
|
||||
logger.warning(f"Resetting stopped in unstage for device {self.name}.")
|
||||
self._stopped = False
|
||||
if self.scan_control.scan_msg.get() in [
|
||||
ScanControlLoadMessage.STARTED,
|
||||
ScanControlLoadMessage.SUCCESS,
|
||||
]:
|
||||
status = CompareStatus(self.scan_control.scan_msg, ScanControlLoadMessage.PENDING)
|
||||
self.cancel_on_stop(status)
|
||||
try:
|
||||
status.wait(2)
|
||||
return None
|
||||
except WaitTimeoutError:
|
||||
logger.warning(
|
||||
f"Timeout in on_unstage of {self.name} after {self.timeout_for_pvwait}s, current scan_control_message : {self.scan_control.scan_msg.get()}"
|
||||
)
|
||||
status = CompareStatus(self.scan_control.scan_msg, ScanControlLoadMessage.PENDING)
|
||||
self.cancel_on_stop(status)
|
||||
self.scan_control.scan_val_reset.put(1)
|
||||
status.wait(timeout=self.timeout_for_pvwait)
|
||||
else:
|
||||
status = CompareStatus(self.scan_control.scan_msg, ScanControlLoadMessage.PENDING)
|
||||
self.cancel_on_stop(status)
|
||||
self.scan_control.scan_val_reset.put(1)
|
||||
status.wait(timeout=self.timeout_for_pvwait)
|
||||
return None
|
||||
|
||||
def on_pre_scan(self) -> DeviceStatus | StatusBase | None:
|
||||
"""Called right before the scan starts on all devices automatically."""
|
||||
|
||||
def on_trigger(self) -> DeviceStatus | StatusBase | None:
|
||||
"""Called when the device is triggered."""
|
||||
|
||||
def on_complete(self) -> DeviceStatus | StatusBase | None:
|
||||
"""Called to inquire if a device has completed a scans."""
|
||||
status = CompareStatus(self.scan_control.scan_done, 1)
|
||||
self.cancel_on_stop(status)
|
||||
return status
|
||||
|
||||
def on_kickoff(self) -> DeviceStatus | StatusBase | None:
|
||||
"""Called to kickoff a device for a fly scan. Has to be called explicitly."""
|
||||
scan_duration = self.scan_control.scan_duration.get()
|
||||
# TODO implement better logic for infinite scans, at least bring it up with SuperXAS
|
||||
start_func = (
|
||||
self.scan_control.scan_start_infinite.put
|
||||
if scan_duration < 0.1
|
||||
else self.scan_control.scan_start_timer.put
|
||||
)
|
||||
status = TransitionStatus(
|
||||
self.scan_control.scan_status,
|
||||
transitions=[ScanControlScanStatus.READY, ScanControlScanStatus.RUNNING],
|
||||
strict=True,
|
||||
failure_states=[ScanControlScanStatus.PARAMETER_WRONG],
|
||||
)
|
||||
self.cancel_on_stop(status)
|
||||
start_func(1)
|
||||
return status
|
||||
|
||||
def on_stop(self) -> None:
|
||||
"""Called when the device is stopped."""
|
||||
self.stopped = True # Needs to be set to stop motion
|
||||
|
||||
######### Utility Methods #########
|
||||
def _progress_update(self, value, **kwargs) -> None:
|
||||
"""Callback method to update the scan progress, runs a callback
|
||||
to SUB_PROGRESS subscribers, i.e. BEC.
|
||||
|
||||
Args:
|
||||
value (int) : current progress value
|
||||
"""
|
||||
max_value = 100
|
||||
self.progress_signal.put(value=value, max_value=max_value, done=bool(max_value == value))
|
||||
|
||||
def set_xas_settings(self, low: float, high: float, scan_time: float) -> None:
|
||||
"""Set XAS parameters for upcoming scan.
|
||||
|
||||
Args:
|
||||
low (float): Low energy/angle value of the scan
|
||||
high (float): High energy/angle value of the scan
|
||||
scan_time (float): Time for a half oscillation
|
||||
"""
|
||||
|
||||
status_list = []
|
||||
|
||||
status_list.append(self.scan_settings.s_scan_energy_lo.set(low))
|
||||
self.cancel_on_stop(status_list[-1])
|
||||
|
||||
status_list.append(self.scan_settings.s_scan_energy_hi.set(high))
|
||||
self.cancel_on_stop(status_list[-1])
|
||||
|
||||
status_list.append(self.scan_settings.s_scan_scantime.set(scan_time))
|
||||
self.cancel_on_stop(status_list[-1])
|
||||
|
||||
for s in status_list:
|
||||
s.wait(timeout=self.timeout_for_pvwait)
|
||||
|
||||
@typechecked
|
||||
def convert_angle_energy(
|
||||
self, mode: Literal["AngleToEnergy", "EnergyToAngle"], inp: float
|
||||
) -> float:
|
||||
"""Calculate energy to angle or vice versa
|
||||
|
||||
Args:
|
||||
mode (Literal["AngleToEnergy", "EnergyToAngle"]): Mode of calculation
|
||||
input (float): Either angle or energy
|
||||
|
||||
Returns:
|
||||
output (float): Converted angle or energy
|
||||
"""
|
||||
self.calculator.calc_reset.put(0)
|
||||
self.calculator.calc_reset.put(1)
|
||||
status = CompareStatus(self.calculator.calc_done, 0)
|
||||
self.cancel_on_stop(status)
|
||||
status.wait(self.timeout_for_pvwait)
|
||||
|
||||
if mode == "AngleToEnergy":
|
||||
self.calculator.calc_angle.put(inp)
|
||||
elif mode == "EnergyToAngle":
|
||||
self.calculator.calc_energy.put(inp)
|
||||
|
||||
status = CompareStatus(self.calculator.calc_done, 1)
|
||||
self.cancel_on_stop(status)
|
||||
status.wait(self.timeout_for_pvwait)
|
||||
time.sleep(0.25) #TODO needed still? Needed due to update frequency of softIOC
|
||||
if mode == "AngleToEnergy":
|
||||
return self.calculator.calc_energy.get()
|
||||
elif mode == "EnergyToAngle":
|
||||
return self.calculator.calc_angle.get()
|
||||
|
||||
def set_advanced_xas_settings(
|
||||
self, low: float, high: float, scan_time: float, p_kink: float, e_kink: float
|
||||
) -> None:
|
||||
"""Set Advanced XAS parameters for upcoming scan.
|
||||
|
||||
Args:
|
||||
low (float): Low angle value of the scan in eV
|
||||
high (float): High angle value of the scan in eV
|
||||
scan_time (float): Time for a half oscillation in s
|
||||
p_kink (float): Position of kink in %
|
||||
e_kink (float): Energy of kink in eV
|
||||
"""
|
||||
e_kink_deg = self.convert_angle_energy(mode="EnergyToAngle", inp=e_kink)
|
||||
# Angle and Energy are inverse proportional!
|
||||
high_deg = self.convert_angle_energy(mode="EnergyToAngle", inp=low)
|
||||
low_deg = self.convert_angle_energy(mode="EnergyToAngle", inp=high)
|
||||
|
||||
pos, vel, dt = compute_spline(
|
||||
low_deg=low_deg,
|
||||
high_deg=high_deg,
|
||||
p_kink=p_kink,
|
||||
e_kink_deg=e_kink_deg,
|
||||
scan_time=scan_time,
|
||||
)
|
||||
|
||||
status_list = []
|
||||
|
||||
status_list.append(self.scan_settings.a_scan_pos.set(pos))
|
||||
self.cancel_on_stop(status_list[-1])
|
||||
|
||||
status_list.append(self.scan_settings.a_scan_vel.set(vel))
|
||||
self.cancel_on_stop(status_list[-1])
|
||||
|
||||
status_list.append(self.scan_settings.a_scan_time.set(dt))
|
||||
self.cancel_on_stop(status_list[-1])
|
||||
|
||||
for s in status_list:
|
||||
s.wait(timeout=self.timeout_for_pvwait)
|
||||
|
||||
def set_trig_settings(
|
||||
self,
|
||||
enable_low: bool,
|
||||
enable_high: bool,
|
||||
exp_time_low: int,
|
||||
exp_time_high: int,
|
||||
cycle_low: int,
|
||||
cycle_high: int,
|
||||
) -> None:
|
||||
"""Set TRIG settings for the upcoming scan.
|
||||
|
||||
Args:
|
||||
enable_low (bool): Enable TRIG for low energy/angle
|
||||
enable_high (bool): Enable TRIG for high energy/angle
|
||||
num_trigger_low (int): Number of triggers for low energy/angle
|
||||
num_trigger_high (int): Number of triggers for high energy/angle
|
||||
exp_time_low (int): Exposure time for low energy/angle
|
||||
exp_time_high (int): Exposure time for high energy/angle
|
||||
cycle_low (int): Cycle for low energy/angle
|
||||
cycle_high (int): Cycle for high energy/angle
|
||||
"""
|
||||
|
||||
status_list = []
|
||||
|
||||
status_list.append(self.scan_settings.trig_ena_hi_enum.set(int(enable_high)))
|
||||
self.cancel_on_stop(status_list[-1])
|
||||
|
||||
status_list.append(self.scan_settings.trig_ena_lo_enum.set(int(enable_low)))
|
||||
self.cancel_on_stop(status_list[-1])
|
||||
|
||||
status_list.append(self.scan_settings.trig_time_hi.set(exp_time_high))
|
||||
self.cancel_on_stop(status_list[-1])
|
||||
|
||||
status_list.append(self.scan_settings.trig_time_lo.set(exp_time_low))
|
||||
self.cancel_on_stop(status_list[-1])
|
||||
|
||||
status_list.append(self.scan_settings.trig_every_n_hi.set(cycle_high))
|
||||
self.cancel_on_stop(status_list[-1])
|
||||
|
||||
status_list.append(self.scan_settings.trig_every_n_lo.set(cycle_low))
|
||||
self.cancel_on_stop(status_list[-1])
|
||||
|
||||
for s in status_list:
|
||||
s.wait(timeout=self.timeout_for_pvwait)
|
||||
|
||||
|
||||
def set_scan_control_settings(self, mode: ScanControlMode, scan_duration: float) -> None:
|
||||
"""Set the scan control settings for the upcoming scan.
|
||||
|
||||
Args:
|
||||
mode (ScanControlMode): Mode for the scan, either simple or advanced
|
||||
scan_duration (float): Duration of the scan
|
||||
"""
|
||||
val = ScanControlMode(mode).value
|
||||
|
||||
status_list = []
|
||||
|
||||
status_list.append(self.scan_control.scan_mode_enum.set(val))
|
||||
self.cancel_on_stop(status_list[-1])
|
||||
|
||||
status_list.append(self.scan_control.scan_duration.set(scan_duration))
|
||||
self.cancel_on_stop(status_list[-1])
|
||||
|
||||
for s in status_list:
|
||||
s.wait(timeout=self.timeout_for_pvwait)
|
||||
|
||||
|
||||
def _update_scan_parameter(self):
|
||||
"""Get the scan_info parameters for the scan."""
|
||||
for key, value in self.scan_info.msg.request_inputs["inputs"].items():
|
||||
if hasattr(self.scan_parameter, key):
|
||||
setattr(self.scan_parameter, key, value)
|
||||
for key, value in self.scan_info.msg.request_inputs["kwargs"].items():
|
||||
if hasattr(self.scan_parameter, key):
|
||||
setattr(self.scan_parameter, key, value)
|
||||
@@ -0,0 +1,20 @@
|
||||
"""Positioner implementation with readback angle of the MO1 Bragg positioner."""
|
||||
|
||||
from ophyd import Component as Cpt
|
||||
from ophyd import EpicsSignalRO, EpicsSignalWithRBV
|
||||
|
||||
from superxas_bec.devices.mo1_bragg.mo1_bragg_devices import Mo1BraggPositioner
|
||||
|
||||
|
||||
class Mo1BraggAngle(Mo1BraggPositioner):
|
||||
"""Positioner implementation with readback angle of the MO1 Bragg positioner."""
|
||||
|
||||
readback = Cpt(EpicsSignalRO, suffix="feedback_pos_angle_RBV", kind="normal", auto_monitor=True)
|
||||
setpoint = Cpt(EpicsSignalWithRBV, suffix="set_abs_pos_angle", kind="normal", auto_monitor=True)
|
||||
low_lim = Cpt(EpicsSignalRO, suffix="lo_lim_pos_angle_RBV", kind="config", auto_monitor=True)
|
||||
high_lim = Cpt(EpicsSignalRO, suffix="hi_lim_pos_angle_RBV", kind="config", auto_monitor=True)
|
||||
|
||||
@property
|
||||
def egu(self) -> str:
|
||||
"""Return the engineering unit of the positioner."""
|
||||
return "deg"
|
||||
@@ -0,0 +1,407 @@
|
||||
"""Module for the Mo1 Bragg positioner"""
|
||||
|
||||
import threading
|
||||
import time
|
||||
import traceback
|
||||
from typing import Literal
|
||||
|
||||
from bec_lib.logger import bec_logger
|
||||
from ophyd import Component as Cpt
|
||||
from ophyd import (
|
||||
Device,
|
||||
DeviceStatus,
|
||||
EpicsSignal,
|
||||
EpicsSignalRO,
|
||||
EpicsSignalWithRBV,
|
||||
PositionerBase,
|
||||
Signal,
|
||||
)
|
||||
from ophyd.utils import LimitError
|
||||
|
||||
from superxas_bec.devices.mo1_bragg.mo1_bragg_enums import MoveType
|
||||
|
||||
# Initialise logger
|
||||
logger = bec_logger.logger
|
||||
|
||||
############# Exceptions #############
|
||||
|
||||
|
||||
class Mo1BraggStoppedError(Exception):
|
||||
"""Exception to raise when the Bragg positioner is stopped."""
|
||||
|
||||
|
||||
############# Signal classes #############
|
||||
|
||||
|
||||
class MoveTypeSignal(Signal):
|
||||
"""Custom Signal to set the move type of the Bragg positioner"""
|
||||
|
||||
# pylint: disable=arguments-differ
|
||||
def set(self, value: str | MoveType) -> None:
|
||||
"""Returns currently active move method
|
||||
|
||||
Args:
|
||||
value (str | MoveType) : Can be either 'energy' or 'angle'
|
||||
"""
|
||||
|
||||
value = MoveType(value.lower())
|
||||
self._readback = value.value
|
||||
|
||||
|
||||
############# Utility devices to separate the namespace #############
|
||||
|
||||
|
||||
class Mo1BraggStatus(Device):
|
||||
"""Mo1 Bragg PVs for status monitoring"""
|
||||
|
||||
error_status = Cpt(EpicsSignalRO, suffix="error_status_RBV", kind="config", auto_monitor=True)
|
||||
brake_enabled = Cpt(EpicsSignalRO, suffix="brake_enabled_RBV", kind="config", auto_monitor=True)
|
||||
mot_commutated = Cpt(
|
||||
EpicsSignalRO, suffix="mot_commutated_RBV", kind="config", auto_monitor=True
|
||||
)
|
||||
axis_enabled = Cpt(EpicsSignalRO, suffix="axis_enabled_RBV", kind="config", auto_monitor=True)
|
||||
heartbeat = Cpt(EpicsSignalRO, suffix="heartbeat_RBV", kind="config", auto_monitor=True)
|
||||
|
||||
|
||||
class Mo1BraggCrystal(Device):
|
||||
"""Mo1 Bragg PVs to set the crystal parameters"""
|
||||
|
||||
bragg_off_si111 = Cpt(EpicsSignalWithRBV, suffix="bragg_off_si111", kind="config")
|
||||
bragg_off_si311 = Cpt(EpicsSignalWithRBV, suffix="bragg_off_si311", kind="config")
|
||||
xtal_enum = Cpt(EpicsSignalWithRBV, suffix="xtal_ENUM", kind="config")
|
||||
d_spacing_si111 = Cpt(EpicsSignalWithRBV, suffix="d_spacing_si111", kind="config")
|
||||
d_spacing_si311 = Cpt(EpicsSignalWithRBV, suffix="d_spacing_si311", kind="config")
|
||||
set_offset = Cpt(EpicsSignal, suffix="set_offset", kind="config", put_complete=True)
|
||||
current_d_spacing = Cpt(
|
||||
EpicsSignalRO, suffix="current_d_spacing_RBV", kind="normal", auto_monitor=True
|
||||
)
|
||||
current_bragg_off = Cpt(
|
||||
EpicsSignalRO, suffix="current_bragg_off_RBV", kind="normal", auto_monitor=True
|
||||
)
|
||||
current_xtal = Cpt(
|
||||
EpicsSignalRO, suffix="current_xtal_ENUM_RBV", kind="normal", auto_monitor=True
|
||||
)
|
||||
|
||||
current_xtal_string = Cpt(
|
||||
EpicsSignalRO, suffix="current_xtal_ENUM_RBV", kind="normal", auto_monitor=True, string=True
|
||||
)
|
||||
|
||||
|
||||
class Mo1BraggScanSettings(Device):
|
||||
"""Mo1 Bragg PVs to set the scan setttings"""
|
||||
|
||||
# TRIG settings
|
||||
trig_select_ref_enum = Cpt(EpicsSignalWithRBV, suffix="trig_select_ref_ENUM", kind="config")
|
||||
|
||||
trig_ena_hi_enum = Cpt(EpicsSignalWithRBV, suffix="trig_ena_hi_ENUM", kind="config")
|
||||
trig_time_hi = Cpt(EpicsSignalWithRBV, suffix="trig_time_hi", kind="config")
|
||||
trig_every_n_hi = Cpt(EpicsSignalWithRBV, suffix="trig_every_n_hi", kind="config")
|
||||
|
||||
trig_ena_lo_enum = Cpt(EpicsSignalWithRBV, suffix="trig_ena_lo_ENUM", kind="config")
|
||||
trig_time_lo = Cpt(EpicsSignalWithRBV, suffix="trig_time_lo", kind="config")
|
||||
trig_every_n_lo = Cpt(EpicsSignalWithRBV, suffix="trig_every_n_lo", kind="config")
|
||||
|
||||
# XAS simple scan settings
|
||||
s_scan_angle_hi = Cpt(EpicsSignalWithRBV, suffix="s_scan_angle_hi", kind="config")
|
||||
s_scan_angle_lo = Cpt(EpicsSignalWithRBV, suffix="s_scan_angle_lo", kind="config")
|
||||
s_scan_energy_lo = Cpt(
|
||||
EpicsSignalWithRBV, suffix="s_scan_energy_lo", kind="config", auto_monitor=True
|
||||
)
|
||||
s_scan_energy_hi = Cpt(
|
||||
EpicsSignalWithRBV, suffix="s_scan_energy_hi", kind="config", auto_monitor=True
|
||||
)
|
||||
s_scan_scantime = Cpt(
|
||||
EpicsSignalWithRBV, suffix="s_scan_scantime", kind="config", auto_monitor=True
|
||||
)
|
||||
|
||||
# XAS advanced scan settings
|
||||
a_scan_pos = Cpt(EpicsSignalWithRBV, suffix="a_scan_pos", kind="config", auto_monitor=True)
|
||||
a_scan_vel = Cpt(EpicsSignalWithRBV, suffix="a_scan_vel", kind="config", auto_monitor=True)
|
||||
a_scan_time = Cpt(EpicsSignalWithRBV, suffix="a_scan_time", kind="config", auto_monitor=True)
|
||||
|
||||
|
||||
class Mo1TriggerSettings(Device):
|
||||
"""Mo1 Trigger settings"""
|
||||
|
||||
settle_time = Cpt(EpicsSignalWithRBV, suffix="settle_time", kind="config")
|
||||
max_dev = Cpt(EpicsSignalWithRBV, suffix="max_dev", kind="config")
|
||||
|
||||
xrd_trig_src_enum = Cpt(EpicsSignalWithRBV, suffix="xrd_trig_src_ENUM", kind="config")
|
||||
xrd_trig_mode_enum = Cpt(EpicsSignalWithRBV, suffix="xrd_trig_mode_ENUM", kind="config")
|
||||
xrd_trig_len = Cpt(EpicsSignalWithRBV, suffix="xrd_trig_len", kind="config")
|
||||
xrd_trig_req = Cpt(EpicsSignal, suffix="xrd_trig_req", kind="config")
|
||||
|
||||
falcon_trig_src_enum = Cpt(EpicsSignalWithRBV, suffix="falcon_trig_src_ENUM", kind="config")
|
||||
falcon_trig_mode_enum = Cpt(EpicsSignalWithRBV, suffix="falcon_trig_mode_ENUM", kind="config")
|
||||
falcon_trig_len = Cpt(EpicsSignalWithRBV, suffix="falcon_trig_len", kind="config")
|
||||
falcon_trig_req = Cpt(EpicsSignal, suffix="falcon_trig_req", kind="config")
|
||||
|
||||
univ1_trig_src_enum = Cpt(EpicsSignalWithRBV, suffix="univ1_trig_src_ENUM", kind="config")
|
||||
univ1_trig_mode_enum = Cpt(EpicsSignalWithRBV, suffix="univ1_trig_mode_ENUM", kind="config")
|
||||
univ1_trig_len = Cpt(EpicsSignalWithRBV, suffix="univ1_trig_len", kind="config")
|
||||
univ1_trig_req = Cpt(EpicsSignal, suffix="univ1_trig_req", kind="config")
|
||||
|
||||
univ2_trig_src_enum = Cpt(EpicsSignalWithRBV, suffix="univ2_trig_src_ENUM", kind="config")
|
||||
univ2_trig_mode_enum = Cpt(EpicsSignalWithRBV, suffix="univ2_trig_mode_ENUM", kind="config")
|
||||
univ2_trig_len = Cpt(EpicsSignalWithRBV, suffix="univ2_trig_len", kind="config")
|
||||
univ2_trig_req = Cpt(EpicsSignal, suffix="univ2_trig_req", kind="config")
|
||||
|
||||
|
||||
class Mo1BraggCalculator(Device):
|
||||
"""Mo1 Bragg PVs to convert angle to energy or vice-versa."""
|
||||
|
||||
calc_reset = Cpt(EpicsSignal, suffix="calc_reset", kind="config", put_complete=True)
|
||||
calc_done = Cpt(EpicsSignalRO, suffix="calc_done_RBV", kind="config")
|
||||
calc_energy = Cpt(EpicsSignalWithRBV, suffix="calc_energy", kind="config")
|
||||
calc_angle = Cpt(EpicsSignalWithRBV, suffix="calc_angle", kind="config")
|
||||
|
||||
|
||||
class Mo1BraggScanControl(Device):
|
||||
"""Mo1 Bragg PVs to control the scan after setting the parameters."""
|
||||
|
||||
scan_mode_enum = Cpt(EpicsSignalWithRBV, suffix="scan_mode_ENUM", kind="config")
|
||||
scan_duration = Cpt(
|
||||
EpicsSignalWithRBV, suffix="scan_duration", kind="config", auto_monitor=True
|
||||
)
|
||||
scan_load = Cpt(EpicsSignal, suffix="scan_load", kind="config", put_complete=True)
|
||||
scan_msg = Cpt(EpicsSignalRO, suffix="scan_msg_ENUM_RBV", kind="config", auto_monitor=True)
|
||||
scan_start_infinite = Cpt(
|
||||
EpicsSignal, suffix="scan_start_infinite", kind="config", put_complete=True
|
||||
)
|
||||
scan_start_timer = Cpt(EpicsSignal, suffix="scan_start_timer", kind="config", put_complete=True)
|
||||
scan_stop = Cpt(EpicsSignal, suffix="scan_stop", kind="config", put_complete=True)
|
||||
scan_status = Cpt(
|
||||
EpicsSignalRO, suffix="scan_status_ENUM_RBV", kind="config", auto_monitor=True
|
||||
)
|
||||
scan_time_left = Cpt(
|
||||
EpicsSignalRO, suffix="scan_time_left_RBV", kind="config", auto_monitor=True
|
||||
)
|
||||
scan_done = Cpt(EpicsSignalRO, suffix="scan_done_RBV", kind="config", auto_monitor=True)
|
||||
scan_val_reset = Cpt(EpicsSignal, suffix="scan_val_reset", kind="config", put_complete=True)
|
||||
scan_progress = Cpt(EpicsSignalRO, suffix="scan_progress_RBV", kind="config", auto_monitor=True)
|
||||
scan_spectra_done = Cpt(
|
||||
EpicsSignalRO, suffix="scan_n_osc_RBV", kind="config", auto_monitor=True
|
||||
)
|
||||
scan_spectra_left = Cpt(
|
||||
EpicsSignalRO, suffix="scan_n_osc_left_RBV", kind="config", auto_monitor=True
|
||||
)
|
||||
|
||||
|
||||
class Mo1BraggPositioner(Device, PositionerBase):
|
||||
"""
|
||||
Positioner implementation with readback energy of the MO1 Bragg positioner.
|
||||
|
||||
The prefix to connect to the soft IOC is X10DA-OP-MO1:BRAGG:
|
||||
This soft IOC connects to the NI motor and its control loop.
|
||||
"""
|
||||
|
||||
USER_ACCESS = ["set_xtal"]
|
||||
|
||||
####### Sub-components ########
|
||||
# Namespace is cleaner and easier to maintain
|
||||
crystal = Cpt(Mo1BraggCrystal, "")
|
||||
scan_settings = Cpt(Mo1BraggScanSettings, "")
|
||||
trigger_settings = Cpt(Mo1TriggerSettings, "")
|
||||
calculator = Cpt(Mo1BraggCalculator, "")
|
||||
scan_control = Cpt(Mo1BraggScanControl, "")
|
||||
status = Cpt(Mo1BraggStatus, "")
|
||||
|
||||
############# Energy PVs #############
|
||||
|
||||
readback = Cpt(
|
||||
EpicsSignalRO, suffix="feedback_pos_energy_RBV", kind="hinted", auto_monitor=True
|
||||
)
|
||||
setpoint = Cpt(
|
||||
EpicsSignalWithRBV, suffix="set_abs_pos_energy", kind="normal", auto_monitor=True
|
||||
)
|
||||
motor_is_moving = Cpt(
|
||||
EpicsSignalRO, suffix="move_abs_done_RBV", kind="normal", auto_monitor=True
|
||||
)
|
||||
low_lim = Cpt(EpicsSignalRO, suffix="lo_lim_pos_energy_RBV", kind="config", auto_monitor=True)
|
||||
high_lim = Cpt(EpicsSignalRO, suffix="hi_lim_pos_energy_RBV", kind="config", auto_monitor=True)
|
||||
velocity = Cpt(EpicsSignalWithRBV, suffix="move_velocity", kind="config", auto_monitor=True)
|
||||
|
||||
########## Move Command PVs ##########
|
||||
|
||||
move_abs = Cpt(EpicsSignal, suffix="move_abs", kind="config", put_complete=True)
|
||||
move_stop = Cpt(EpicsSignal, suffix="move_stop", kind="config", put_complete=True)
|
||||
|
||||
SUB_READBACK = "readback"
|
||||
_default_sub = SUB_READBACK
|
||||
SUB_PROGRESS = "progress"
|
||||
|
||||
def __init__(self, prefix="", *, name: str, **kwargs):
|
||||
"""Initialize the Mo1 Bragg positioner.
|
||||
|
||||
Args:
|
||||
prefix (str): EPICS prefix for the device
|
||||
name (str): Name of the device
|
||||
kwargs: Additional keyword arguments
|
||||
"""
|
||||
super().__init__(prefix, name=name, **kwargs)
|
||||
self._move_thread = None
|
||||
self._stopped = False
|
||||
self.readback.name = self.name
|
||||
|
||||
def stop(self, *, success=False) -> None:
|
||||
"""Stop any motion on the positioner
|
||||
|
||||
Args:
|
||||
success (bool) : Flag to indicate if the motion was successful
|
||||
"""
|
||||
self.move_stop.put(1)
|
||||
self._stopped = True
|
||||
super().stop(success=success)
|
||||
|
||||
def stop_scan(self) -> None:
|
||||
"""Stop the currently running scan gracefully, this finishes the running oscillation."""
|
||||
self.scan_control.scan_stop.put(1)
|
||||
|
||||
@property
|
||||
def stopped(self) -> bool:
|
||||
"""Return the status of the positioner"""
|
||||
return self._stopped
|
||||
|
||||
######### Positioner specific methods #########
|
||||
|
||||
@property
|
||||
def limits(self) -> tuple:
|
||||
"""Return limits of the Bragg positioner"""
|
||||
return (self.low_lim.get(), self.high_lim.get())
|
||||
|
||||
@property
|
||||
def low_limit(self) -> float:
|
||||
"""Return low limit of axis"""
|
||||
return self.limits[0]
|
||||
|
||||
@property
|
||||
def high_limit(self) -> float:
|
||||
"""Return high limit of axis"""
|
||||
return self.limits[1]
|
||||
|
||||
@property
|
||||
def egu(self) -> str:
|
||||
"""Return the engineering units of the positioner"""
|
||||
return "eV"
|
||||
|
||||
@property
|
||||
def position(self) -> float:
|
||||
"""Return the current position of Mo1Bragg, considering the move type"""
|
||||
return self.readback.get()
|
||||
|
||||
# pylint: disable=arguments-differ
|
||||
def check_value(self, value: float) -> None:
|
||||
"""Method to check if a value is within limits of the positioner.
|
||||
Called by PositionerBase.move()
|
||||
|
||||
Args:
|
||||
value (float) : value to move axis to.
|
||||
"""
|
||||
low_limit, high_limit = self.limits
|
||||
|
||||
if low_limit < high_limit and not low_limit <= value <= high_limit:
|
||||
raise LimitError(f"position={value} not within limits {self.limits}")
|
||||
|
||||
def _move_and_finish(
|
||||
self, target_pos: float, status: DeviceStatus, update_frequency: float = 0.1
|
||||
) -> None:
|
||||
"""
|
||||
Method to be called in the move thread to move the Bragg positioner
|
||||
to the target position.
|
||||
|
||||
Args:
|
||||
target_pos (float) : target position for the motion
|
||||
move_cpt (Cpt) : component to set the target position on the IOC,
|
||||
either setpoint or setpoint_abs_angle depending
|
||||
on the move type
|
||||
read_cpt (Cpt) : component to read the current position of the motion,
|
||||
readback or feedback_pos_angle
|
||||
status (DeviceStatus) : status object to set the status of the motion
|
||||
update_frequency (float): Optional, frequency to update the current position of
|
||||
the motion, defaults to 0.1s
|
||||
"""
|
||||
motor_name = None
|
||||
try:
|
||||
# Set the target position on IOC
|
||||
self.setpoint.put(target_pos)
|
||||
self.move_abs.put(1)
|
||||
# Currently sleep is needed due to delay in updates on PVs, maybe time can be reduced
|
||||
time.sleep(0.5)
|
||||
motor_name = self.name
|
||||
while self.motor_is_moving.get() == 0:
|
||||
if self.stopped:
|
||||
raise Mo1BraggStoppedError(f"Device {self.name} was stopped")
|
||||
time.sleep(update_frequency)
|
||||
# pylint: disable=protected-access
|
||||
status.set_finished()
|
||||
# pylint: disable=broad-except
|
||||
except Exception as exc:
|
||||
content = traceback.format_exc()
|
||||
logger.error(
|
||||
f"Error in move thread of device {motor_name if motor_name else ''}: {content}"
|
||||
)
|
||||
status.set_exception(exc=exc)
|
||||
|
||||
def move(self, value: float, **kwargs) -> DeviceStatus:
|
||||
"""
|
||||
Move the Bragg positioner to the specified value, allows to
|
||||
switch between move types angle and energy.
|
||||
|
||||
Args:
|
||||
value (float) : target value for the motion
|
||||
move_type (str | MoveType) : Optional, specify the type of move,
|
||||
either 'energy' or 'angle'
|
||||
|
||||
Returns:
|
||||
DeviceStatus : status object to track the motion
|
||||
"""
|
||||
self._stopped = False
|
||||
|
||||
self.check_value(value)
|
||||
status = DeviceStatus(device=self)
|
||||
|
||||
self._move_thread = threading.Thread(
|
||||
target=self._move_and_finish, args=(value, status, 0.1)
|
||||
)
|
||||
self._move_thread.start()
|
||||
return status
|
||||
|
||||
# -------------- End of Positioner specific methods -----------------#
|
||||
|
||||
# -------------- MO1 Bragg specific methods -----------------#
|
||||
|
||||
def set_xtal(
|
||||
self,
|
||||
xtal_enum: Literal["111", "311"],
|
||||
bragg_off_si111: float = None,
|
||||
bragg_off_si311: float = None,
|
||||
d_spacing_si111: float = None,
|
||||
d_spacing_si311: float = None,
|
||||
) -> None:
|
||||
"""Method to set the crystal parameters of the Bragg positioner
|
||||
|
||||
Args:
|
||||
xtal_enum (Literal["111", "311"]) : Enum to set the crystal orientation
|
||||
bragg_off_si111 (float) : Offset for the 111 crystal
|
||||
bragg_off_si311 (float) : Offset for the 311 crystal
|
||||
d_spacing_si111 (float) : d-spacing for the 111 crystal
|
||||
d_spacing_si311 (float) : d-spacing for the 311 crystal
|
||||
"""
|
||||
if bragg_off_si111 is not None:
|
||||
self.crystal.bragg_off_si111.put(bragg_off_si111)
|
||||
if bragg_off_si311 is not None:
|
||||
self.crystal.bragg_off_si311.put(bragg_off_si311)
|
||||
if d_spacing_si111 is not None:
|
||||
self.crystal.d_spacing_si111.put(d_spacing_si111)
|
||||
if d_spacing_si311 is not None:
|
||||
self.crystal.d_spacing_si311.put(d_spacing_si311)
|
||||
if xtal_enum == "111":
|
||||
crystal_set = 0
|
||||
elif xtal_enum == "311":
|
||||
crystal_set = 1
|
||||
else:
|
||||
raise ValueError(
|
||||
f"Invalid argument for xtal_enum : {xtal_enum}, choose from '111' or '311'"
|
||||
)
|
||||
self.crystal.xtal_enum.put(crystal_set)
|
||||
self.crystal.set_offset.put(1)
|
||||
@@ -0,0 +1,61 @@
|
||||
"""Enums for the Bragg positioner and trigger generator"""
|
||||
|
||||
import enum
|
||||
|
||||
|
||||
class TriggerControlSource(int, enum.Enum):
|
||||
"""Enum class for the trigger control source of the trigger generator"""
|
||||
|
||||
EPICS = 0
|
||||
INPOS = 1
|
||||
|
||||
|
||||
class TriggerControlMode(int, enum.Enum):
|
||||
"""Enum class for the trigger control mode of the trigger generator"""
|
||||
|
||||
PULSE = 0
|
||||
CONDITION = 1
|
||||
|
||||
|
||||
class ScanControlScanStatus(int, enum.Enum):
|
||||
"""Enum class for the scan status of the Bragg positioner"""
|
||||
|
||||
PARAMETER_WRONG = 0
|
||||
VALIDATION_PENDING = 1
|
||||
READY = 2
|
||||
RUNNING = 3
|
||||
|
||||
|
||||
class ScanControlLoadMessage(int, enum.Enum):
|
||||
"""Enum for validating messages for load message of the Bragg positioner"""
|
||||
|
||||
PENDING = 0
|
||||
STARTED = 1
|
||||
SUCCESS = 2
|
||||
ERR_TRIG_MEAS_LEN_LOW = 3
|
||||
ERR_TRIG_N_TRIGGERS_LOW = 4
|
||||
ERR_TRIG_TRIGS_EVERY_N_LOW = 5
|
||||
ERR_TRIG_MEAS_LEN_HI = 6
|
||||
ERR_TRIG_N_TRIGGERS_HI = 7
|
||||
ERR_TRIG_TRIGS_EVERY_N_HI = 8
|
||||
ERR_SCAN_HI_ANGLE_LIMIT = 9
|
||||
ERR_SCAN_LOW_ANGLE_LIMITS = 10
|
||||
ERR_SCAN_TIME = 11
|
||||
ERR_SCAN_VEL_TOO_HI = 12
|
||||
ERR_SCAN_ANGLE_OUT_OF_LIM = 13
|
||||
ERR_SCAN_HIGH_VEL_LAR_42 = 14
|
||||
ERR_SCAN_MODE_INVALID = 15
|
||||
|
||||
|
||||
class MoveType(str, enum.Enum):
|
||||
"""Enum class to switch between move types energy and angle for the Bragg positioner"""
|
||||
|
||||
ENERGY = "energy"
|
||||
ANGLE = "angle"
|
||||
|
||||
|
||||
class ScanControlMode(int, enum.Enum):
|
||||
"""Enum class for the scan control mode of the Bragg positioner"""
|
||||
|
||||
SIMPLE = 0
|
||||
ADVANCED = 1
|
||||
@@ -0,0 +1,93 @@
|
||||
"""Module for additional utils of the Mo1 Bragg Positioner"""
|
||||
|
||||
import numpy as np
|
||||
from scipy.interpolate import BSpline
|
||||
|
||||
################ Define Constants ############
|
||||
SAFETY_FACTOR = 0.025 # safety factor to limit acceleration -> NEVER SET TO ZERO !
|
||||
N_SAMPLES = 41 # number of samples to generate -> Always choose uneven number,
|
||||
# otherwise peak value will not be included
|
||||
DEGREE_SPLINE = 3 # DEGREE_SPLINE of spline, 3 works good
|
||||
TIME_COMPENSATE_SPLINE = 0.0062 # time to be compensated each spline in s
|
||||
POSITION_COMPONSATION = 0.02 # angle to add at both limits, must be same values
|
||||
# as used on ACS controller for simple scans
|
||||
|
||||
|
||||
class Mo1UtilsSplineError(Exception):
|
||||
"""Exception for spline computation"""
|
||||
|
||||
|
||||
def compute_spline(
|
||||
low_deg: float, high_deg: float, p_kink: float, e_kink_deg: float, scan_time: float
|
||||
) -> tuple[float, float, float]:
|
||||
"""Spline computation for the advanced scan mode
|
||||
|
||||
Args:
|
||||
low_deg (float): Low angle value of the scan in deg
|
||||
high_deg (float): High angle value of the scan in deg
|
||||
scan_time (float): Time for a half oscillation in s
|
||||
p_kink (float): Position of kink in %
|
||||
e_kink_deg (float): Position of kink in degree
|
||||
|
||||
Returns:
|
||||
tuple[float,float,float] : Position, Velocity and delta T arrays for the spline
|
||||
"""
|
||||
|
||||
# increase motion range slightly so that xas trigger signals will occur at defined energy limits
|
||||
low_deg = low_deg - POSITION_COMPONSATION
|
||||
high_deg = high_deg + POSITION_COMPONSATION
|
||||
|
||||
if not (0 <= p_kink <= 100):
|
||||
raise Mo1UtilsSplineError(
|
||||
"Kink position not within range of [0..100%]" + f"for p_kink: {p_kink}"
|
||||
)
|
||||
|
||||
if not (low_deg < e_kink_deg < high_deg):
|
||||
raise Mo1UtilsSplineError(
|
||||
"Kink energy not within selected energy range of scan,"
|
||||
+ f"for e_kink_deg {e_kink_deg}, low_deg {low_deg} and"
|
||||
+ f"high_deg {high_deg}."
|
||||
)
|
||||
|
||||
tc1 = SAFETY_FACTOR / scan_time * TIME_COMPENSATE_SPLINE
|
||||
t_kink = (scan_time - TIME_COMPENSATE_SPLINE - 2 * (SAFETY_FACTOR - tc1)) * p_kink / 100 + (
|
||||
SAFETY_FACTOR - tc1
|
||||
)
|
||||
|
||||
t_input = [
|
||||
0,
|
||||
SAFETY_FACTOR - tc1,
|
||||
t_kink,
|
||||
scan_time - TIME_COMPENSATE_SPLINE - SAFETY_FACTOR + tc1,
|
||||
scan_time - TIME_COMPENSATE_SPLINE,
|
||||
]
|
||||
p_input = [0, 0, e_kink_deg - low_deg, high_deg - low_deg, high_deg - low_deg]
|
||||
|
||||
cv = np.stack((t_input, p_input)).T # spline coefficients
|
||||
max_param = len(cv) - DEGREE_SPLINE
|
||||
kv = np.clip(np.arange(len(cv) + DEGREE_SPLINE + 1) - DEGREE_SPLINE, 0, max_param) # knots
|
||||
spl = BSpline(kv, cv, DEGREE_SPLINE) # get spline function
|
||||
p = spl(np.linspace(0, max_param, N_SAMPLES))
|
||||
v = spl(np.linspace(0, max_param, N_SAMPLES), 1)
|
||||
a = spl(np.linspace(0, max_param, N_SAMPLES), 2)
|
||||
j = spl(np.linspace(0, max_param, N_SAMPLES), 3)
|
||||
|
||||
tim, pos = p.T
|
||||
pos = pos + low_deg
|
||||
vel = v[:, 1] / v[:, 0]
|
||||
|
||||
acc = []
|
||||
for item in a:
|
||||
acc.append(0) if item[1] == 0 else acc.append(item[1] / item[0])
|
||||
jerk = []
|
||||
for item in j:
|
||||
jerk.append(0) if item[1] == 0 else jerk.append(item[1] / item[0])
|
||||
|
||||
dt = np.zeros(len(tim))
|
||||
for i in np.arange(len(tim)):
|
||||
if i == 0:
|
||||
dt[i] = 0
|
||||
else:
|
||||
dt[i] = 1000 * (tim[i] - tim[i - 1])
|
||||
|
||||
return pos, vel, dt
|
||||
@@ -0,0 +1,706 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
from typing import TYPE_CHECKING, Literal, cast
|
||||
|
||||
from bec_lib.logger import bec_logger
|
||||
from ophyd import Component as Cpt
|
||||
from ophyd import Device, DeviceStatus, EpicsSignal, EpicsSignalRO, Kind, StatusBase
|
||||
from ophyd.status import SubscriptionStatus, WaitTimeoutError
|
||||
from ophyd_devices import CompareStatus, ProgressSignal, TransitionStatus
|
||||
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
|
||||
from ophyd_devices.sim.sim_signals import SetableSignal
|
||||
|
||||
from superxas_bec.devices.nidaq.nidaq_enums import (
|
||||
EncoderFactors,
|
||||
NIDAQCompression,
|
||||
NidaqState,
|
||||
ReadoutRange,
|
||||
ScanRates,
|
||||
ScanType,
|
||||
)
|
||||
|
||||
if TYPE_CHECKING: # pragma: no cover
|
||||
from bec_lib.devicemanager import ScanInfo
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
|
||||
class NidaqError(Exception):
|
||||
"""Nidaq specific error"""
|
||||
|
||||
|
||||
class NidaqControl(Device):
|
||||
"""Nidaq control class with all PVs"""
|
||||
|
||||
energy = Cpt(SetableSignal, value=0, kind=Kind.normal)
|
||||
|
||||
smpl_abs = Cpt(
|
||||
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream sample absorption"
|
||||
)
|
||||
ref_abs = Cpt(
|
||||
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream reference absorption"
|
||||
)
|
||||
cisum = Cpt(
|
||||
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter sum"
|
||||
)
|
||||
|
||||
ai0_mean = Cpt(
|
||||
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 0, MEAN"
|
||||
)
|
||||
ai1_mean = Cpt(
|
||||
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 1, MEAN"
|
||||
)
|
||||
ai2_mean = Cpt(
|
||||
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 2, MEAN"
|
||||
)
|
||||
ai3_mean = Cpt(
|
||||
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 3, MEAN"
|
||||
)
|
||||
ai4_mean = Cpt(
|
||||
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 4, MEAN"
|
||||
)
|
||||
ai5_mean = Cpt(
|
||||
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 5, MEAN"
|
||||
)
|
||||
ai6_mean = Cpt(
|
||||
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 6, MEAN"
|
||||
)
|
||||
ai7_mean = Cpt(
|
||||
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 7, MEAN"
|
||||
)
|
||||
|
||||
di0_max = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream digital input 0, MAX")
|
||||
di1_max = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream digital input 1, MAX")
|
||||
di2_max = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream digital input 2, MAX")
|
||||
di3_max = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream digital input 3, MAX")
|
||||
di4_max = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream digital input 4, MAX")
|
||||
|
||||
ci0_mean = Cpt(
|
||||
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 0, MEAN"
|
||||
)
|
||||
ci1_mean = Cpt(
|
||||
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 1, MEAN"
|
||||
)
|
||||
ci2_mean = Cpt(
|
||||
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 2, MEAN"
|
||||
)
|
||||
ci3_mean = Cpt(
|
||||
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 3, MEAN"
|
||||
)
|
||||
ci4_mean = Cpt(
|
||||
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 4, MEAN"
|
||||
)
|
||||
ci5_mean = Cpt(
|
||||
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 5, MEAN"
|
||||
)
|
||||
ci6_mean = Cpt(
|
||||
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 6, MEAN"
|
||||
)
|
||||
ci7_mean = Cpt(
|
||||
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 7, MEAN"
|
||||
)
|
||||
ci8_mean = Cpt(
|
||||
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 8, MEAN"
|
||||
)
|
||||
ci9_mean = Cpt(
|
||||
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 9, MEAN"
|
||||
)
|
||||
|
||||
### Readback PVs for EpicsEmitter ###
|
||||
ai0 = Cpt(
|
||||
EpicsSignalRO,
|
||||
suffix="NIDAQ-AI0",
|
||||
kind=Kind.normal,
|
||||
doc="EPICS analog input 0",
|
||||
auto_monitor=True,
|
||||
)
|
||||
ai1 = Cpt(
|
||||
EpicsSignalRO,
|
||||
suffix="NIDAQ-AI1",
|
||||
kind=Kind.normal,
|
||||
doc="EPICS analog input 1",
|
||||
auto_monitor=True,
|
||||
)
|
||||
ai2 = Cpt(
|
||||
EpicsSignalRO,
|
||||
suffix="NIDAQ-AI2",
|
||||
kind=Kind.normal,
|
||||
doc="EPICS analog input 2",
|
||||
auto_monitor=True,
|
||||
)
|
||||
ai3 = Cpt(
|
||||
EpicsSignalRO,
|
||||
suffix="NIDAQ-AI3",
|
||||
kind=Kind.normal,
|
||||
doc="EPICS analog input 3",
|
||||
auto_monitor=True,
|
||||
)
|
||||
ai4 = Cpt(
|
||||
EpicsSignalRO,
|
||||
suffix="NIDAQ-AI4",
|
||||
kind=Kind.normal,
|
||||
doc="EPICS analog input 4",
|
||||
auto_monitor=True,
|
||||
)
|
||||
ai5 = Cpt(
|
||||
EpicsSignalRO,
|
||||
suffix="NIDAQ-AI5",
|
||||
kind=Kind.normal,
|
||||
doc="EPICS analog input 5",
|
||||
auto_monitor=True,
|
||||
)
|
||||
ai6 = Cpt(
|
||||
EpicsSignalRO,
|
||||
suffix="NIDAQ-AI6",
|
||||
kind=Kind.normal,
|
||||
doc="EPICS analog input 6",
|
||||
auto_monitor=True,
|
||||
)
|
||||
ai7 = Cpt(
|
||||
EpicsSignalRO,
|
||||
suffix="NIDAQ-AI7",
|
||||
kind=Kind.normal,
|
||||
doc="EPICS analog input 7",
|
||||
auto_monitor=True,
|
||||
)
|
||||
|
||||
ci0 = Cpt(
|
||||
EpicsSignalRO,
|
||||
suffix="NIDAQ-CI0",
|
||||
kind=Kind.normal,
|
||||
doc="EPICS counter input 0",
|
||||
auto_monitor=True,
|
||||
)
|
||||
ci1 = Cpt(
|
||||
EpicsSignalRO,
|
||||
suffix="NIDAQ-CI1",
|
||||
kind=Kind.normal,
|
||||
doc="EPICS counter input 1",
|
||||
auto_monitor=True,
|
||||
)
|
||||
ci2 = Cpt(
|
||||
EpicsSignalRO,
|
||||
suffix="NIDAQ-CI2",
|
||||
kind=Kind.normal,
|
||||
doc="EPICS counter input 2",
|
||||
auto_monitor=True,
|
||||
)
|
||||
ci3 = Cpt(
|
||||
EpicsSignalRO,
|
||||
suffix="NIDAQ-CI3",
|
||||
kind=Kind.normal,
|
||||
doc="EPICS counter input 3",
|
||||
auto_monitor=True,
|
||||
)
|
||||
ci4 = Cpt(
|
||||
EpicsSignalRO,
|
||||
suffix="NIDAQ-CI4",
|
||||
kind=Kind.normal,
|
||||
doc="EPICS counter input 4",
|
||||
auto_monitor=True,
|
||||
)
|
||||
ci5 = Cpt(
|
||||
EpicsSignalRO,
|
||||
suffix="NIDAQ-CI5",
|
||||
kind=Kind.normal,
|
||||
doc="EPICS counter input 5",
|
||||
auto_monitor=True,
|
||||
)
|
||||
ci6 = Cpt(
|
||||
EpicsSignalRO,
|
||||
suffix="NIDAQ-CI6",
|
||||
kind=Kind.normal,
|
||||
doc="EPICS counter input 6",
|
||||
auto_monitor=True,
|
||||
)
|
||||
ci7 = Cpt(
|
||||
EpicsSignalRO,
|
||||
suffix="NIDAQ-CI7",
|
||||
kind=Kind.normal,
|
||||
doc="EPICS counter input 7",
|
||||
auto_monitor=True,
|
||||
)
|
||||
ci8 = Cpt(
|
||||
EpicsSignalRO,
|
||||
suffix="NIDAQ-CI8",
|
||||
kind=Kind.normal,
|
||||
doc="EPICS counter input 8",
|
||||
auto_monitor=True,
|
||||
)
|
||||
ci9 = Cpt(
|
||||
EpicsSignalRO,
|
||||
suffix="NIDAQ-CI9",
|
||||
kind=Kind.normal,
|
||||
doc="EPICS counter input 9",
|
||||
auto_monitor=True,
|
||||
)
|
||||
|
||||
di0 = Cpt(
|
||||
EpicsSignalRO,
|
||||
suffix="NIDAQ-DI0",
|
||||
kind=Kind.normal,
|
||||
doc="EPICS digital input 0",
|
||||
auto_monitor=True,
|
||||
)
|
||||
di1 = Cpt(
|
||||
EpicsSignalRO,
|
||||
suffix="NIDAQ-DI1",
|
||||
kind=Kind.normal,
|
||||
doc="EPICS digital input 1",
|
||||
auto_monitor=True,
|
||||
)
|
||||
di2 = Cpt(
|
||||
EpicsSignalRO,
|
||||
suffix="NIDAQ-DI2",
|
||||
kind=Kind.normal,
|
||||
doc="EPICS digital input 2",
|
||||
auto_monitor=True,
|
||||
)
|
||||
di3 = Cpt(
|
||||
EpicsSignalRO,
|
||||
suffix="NIDAQ-DI3",
|
||||
kind=Kind.normal,
|
||||
doc="EPICS digital input 3",
|
||||
auto_monitor=True,
|
||||
)
|
||||
di4 = Cpt(
|
||||
EpicsSignalRO,
|
||||
suffix="NIDAQ-DI4",
|
||||
kind=Kind.normal,
|
||||
doc="EPICS digital input 4",
|
||||
auto_monitor=True,
|
||||
)
|
||||
|
||||
enc_epics = Cpt(
|
||||
EpicsSignalRO,
|
||||
suffix="NIDAQ-ENC",
|
||||
kind=Kind.normal,
|
||||
doc="EPICS Encoder reading",
|
||||
auto_monitor=True,
|
||||
)
|
||||
|
||||
energy_epics = Cpt(
|
||||
EpicsSignalRO,
|
||||
suffix="NIDAQ-ENERGY",
|
||||
kind=Kind.normal,
|
||||
doc="EPICS Energy reading",
|
||||
auto_monitor=True,
|
||||
)
|
||||
|
||||
### Readback for BEC emitter ###
|
||||
|
||||
ai0_std_dev = Cpt(
|
||||
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 0, STD"
|
||||
)
|
||||
ai1_std_dev = Cpt(
|
||||
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 1, STD"
|
||||
)
|
||||
ai2_std_dev = Cpt(
|
||||
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 2, STD"
|
||||
)
|
||||
ai3_std_dev = Cpt(
|
||||
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 3, STD"
|
||||
)
|
||||
ai4_std_dev = Cpt(
|
||||
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 4, STD"
|
||||
)
|
||||
ai5_std_dev = Cpt(
|
||||
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 5, STD"
|
||||
)
|
||||
ai6_std_dev = Cpt(
|
||||
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 6, STD"
|
||||
)
|
||||
ai7_std_dev = Cpt(
|
||||
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 7, STD"
|
||||
)
|
||||
|
||||
ci0_std_dev = Cpt(
|
||||
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 0. STD"
|
||||
)
|
||||
ci1_std_dev = Cpt(
|
||||
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 1. STD"
|
||||
)
|
||||
ci2_std_dev = Cpt(
|
||||
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 2. STD"
|
||||
)
|
||||
ci3_std_dev = Cpt(
|
||||
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 3. STD"
|
||||
)
|
||||
ci4_std_dev = Cpt(
|
||||
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 4. STD"
|
||||
)
|
||||
ci5_std_dev = Cpt(
|
||||
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 5. STD"
|
||||
)
|
||||
ci6_std_dev = Cpt(
|
||||
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 6. STD"
|
||||
)
|
||||
ci7_std_dev = Cpt(
|
||||
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 7. STD"
|
||||
)
|
||||
ci8_std_dev = Cpt(
|
||||
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 8. STD"
|
||||
)
|
||||
ci9_std_dev = Cpt(
|
||||
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 9. STD"
|
||||
)
|
||||
|
||||
xas_timestamp = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream XAS timestamp")
|
||||
|
||||
# xrd_timestamp = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream XRD timestamp")
|
||||
# xrd_angle = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream XRD angle")
|
||||
# xrd_energy = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream XRD energy")
|
||||
# xrd_ai0_mean = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream XRD ai0 mean")
|
||||
# xrd_ai0_std_dev = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream XRD ai0 std dev")
|
||||
|
||||
enc = Cpt(SetableSignal, value=0, kind=Kind.normal)
|
||||
rle = Cpt(SetableSignal, value=0, kind=Kind.normal)
|
||||
|
||||
### Control PVs ###
|
||||
|
||||
enable_compression = Cpt(EpicsSignal, suffix="NIDAQ-EnableRLE", kind=Kind.config, auto_monitor=True)
|
||||
# enable_dead_time_correction = Cpt(EpicsSignal, suffix="NIDAQ-EnableDTC", kind=Kind.config, auto_monitor=True)
|
||||
kickoff_call = Cpt(EpicsSignal, suffix="NIDAQ-Kickoff", kind=Kind.config)
|
||||
stage_call = Cpt(EpicsSignal, suffix="NIDAQ-Stage", kind=Kind.config)
|
||||
state = Cpt(EpicsSignal, suffix="NIDAQ-FSMState", kind=Kind.config, auto_monitor=True)
|
||||
server_status = Cpt(EpicsSignalRO, suffix="NIDAQ-ServerStatus", kind=Kind.config)
|
||||
compression_ratio = Cpt(EpicsSignalRO, suffix="NIDAQ-CompressionRatio", kind=Kind.config)
|
||||
scan_type = Cpt(EpicsSignal, suffix="NIDAQ-ScanType", kind=Kind.config)
|
||||
scan_type_string = Cpt(EpicsSignal, suffix="NIDAQ-ScanType", kind=Kind.config, string=True)
|
||||
sampling_rate = Cpt(EpicsSignal, suffix="NIDAQ-SamplingRateRequested", kind=Kind.config, auto_monitor=True)
|
||||
sampling_rate_string = Cpt(EpicsSignal, suffix="NIDAQ-SamplingRateRequested", kind=Kind.config, string=True, auto_monitor=True)
|
||||
scan_duration = Cpt(EpicsSignal, suffix="NIDAQ-SamplingDuration", kind=Kind.config)
|
||||
readout_range = Cpt(EpicsSignal, suffix="NIDAQ-ReadoutRange", kind=Kind.config, auto_monitor=True)
|
||||
readout_range_string = Cpt(EpicsSignal, suffix="NIDAQ-ReadoutRange", kind=Kind.config, string=True, auto_monitor=True)
|
||||
encoder_factor = Cpt(EpicsSignal, suffix="NIDAQ-EncoderFactor", kind=Kind.config, auto_monitor=True)
|
||||
encoder_factor_string = Cpt(EpicsSignal, suffix="NIDAQ-EncoderFactor", kind=Kind.config, string=True, auto_monitor=True)
|
||||
stop_call = Cpt(EpicsSignal, suffix="NIDAQ-Stop", kind=Kind.config)
|
||||
power = Cpt(EpicsSignal, suffix="NIDAQ-Power", kind=Kind.config)
|
||||
heartbeat = Cpt(EpicsSignal, suffix="NIDAQ-Heartbeat", kind=Kind.config, auto_monitor=True)
|
||||
time_left = Cpt(EpicsSignalRO, suffix="NIDAQ-TimeLeft", kind=Kind.config, auto_monitor=True)
|
||||
|
||||
ai_chans = Cpt(EpicsSignal, suffix="NIDAQ-AIChans", kind=Kind.config)
|
||||
ci_chans = Cpt(EpicsSignal, suffix="NIDAQ-CIChans", kind=Kind.config)
|
||||
di_chans = Cpt(EpicsSignal, suffix="NIDAQ-DIChans", kind=Kind.config)
|
||||
add_chans = Cpt(EpicsSignal, suffix="NIDAQ-AddChans", kind=Kind.config, auto_monitor=True)
|
||||
|
||||
smpl_abs_ln = Cpt(EpicsSignal, suffix="NIDAQ-smpl_abs_ln", kind=Kind.config, auto_monitor=True)
|
||||
ref_abs_ln = Cpt(EpicsSignal, suffix="NIDAQ-ref_abs_ln", kind=Kind.config, auto_monitor=True)
|
||||
|
||||
smpl_abs_no = Cpt(EpicsSignal, suffix="NIDAQ-smpl_abs_no", kind=Kind.config, auto_monitor=True)
|
||||
smpl_abs_no_string = Cpt(EpicsSignal, suffix="NIDAQ-smpl_abs_no", kind=Kind.config, string=True, auto_monitor=True)
|
||||
|
||||
smpl_abs_de = Cpt(EpicsSignal, suffix="NIDAQ-smpl_abs_de", kind=Kind.config, auto_monitor=True)
|
||||
smpl_abs_de_string = Cpt(EpicsSignal, suffix="NIDAQ-smpl_abs_de", kind=Kind.config, string=True, auto_monitor=True)
|
||||
|
||||
ref_abs_no = Cpt(EpicsSignal, suffix="NIDAQ-ref_abs_no", kind=Kind.config, auto_monitor=True)
|
||||
ref_abs_no_string = Cpt(EpicsSignal, suffix="NIDAQ-ref_abs_no", kind=Kind.config, string=True, auto_monitor=True)
|
||||
|
||||
ref_abs_de = Cpt(EpicsSignal, suffix="NIDAQ-ref_abs_de", kind=Kind.config, auto_monitor=True)
|
||||
ref_abs_de_string = Cpt(EpicsSignal, suffix="NIDAQ-ref_abs_de", kind=Kind.config, string=True, auto_monitor=True)
|
||||
|
||||
|
||||
class Nidaq(PSIDeviceBase, NidaqControl):
|
||||
"""NIDAQ ophyd wrapper around the NIDAQ backend currently running at x10da-nidaq-01
|
||||
|
||||
Args:
|
||||
prefix (str) : Prefix to the NIDAQ soft ioc, currently X10DA-CPCL-SCANSERVER:
|
||||
name (str) : Name of the device
|
||||
scan_info (ScanInfo) : ScanInfo object passed by BEC's devicemanager.
|
||||
"""
|
||||
|
||||
progress_signal = Cpt(ProgressSignal, name="progress_signal")
|
||||
|
||||
USER_ACCESS = ["set_config"]
|
||||
|
||||
def __init__(self, prefix: str = "", *, name: str, scan_info: ScanInfo = None, **kwargs):
|
||||
super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs)
|
||||
self.scan_info: ScanInfo
|
||||
self.timeout_wait_for_signal = 5 # put 5s firsts
|
||||
self._timeout_wait_for_pv = 5 # 5s timeout for pv calls. editted due to timeout issues persisting
|
||||
self.valid_scan_names = [
|
||||
"xas_simple_scan",
|
||||
"xas_simple_scan_with_xrd",
|
||||
"xas_advanced_scan",
|
||||
"xas_advanced_scan_with_xrd",
|
||||
"nidaq_continuous_scan",
|
||||
]
|
||||
|
||||
########################################
|
||||
# Beamline Methods #
|
||||
########################################
|
||||
|
||||
def _check_if_scan_name_is_valid(self) -> bool:
|
||||
"""Check if the scan is within the list of scans for which the backend is working"""
|
||||
scan_name = self.scan_info.msg.scan_name
|
||||
if scan_name in self.valid_scan_names:
|
||||
return True
|
||||
return False
|
||||
|
||||
def set_config(
|
||||
self,
|
||||
sampling_rate: Literal[
|
||||
100000, 500000, 1000000, 2000000, 4000000, 5000000, 10000000, 14286000
|
||||
],
|
||||
ai: list,
|
||||
ci: list,
|
||||
di: list,
|
||||
scan_type: Literal["continuous", "triggered"] = "triggered",
|
||||
scan_duration: float = 0,
|
||||
readout_range: Literal[1, 2, 5, 10] = 10,
|
||||
encoder_type: Literal["X_1", "X_2", "X_4"] = "X_4",
|
||||
enable_compression: bool = True,
|
||||
) -> None:
|
||||
"""Method to configure the NIDAQ
|
||||
|
||||
Args:
|
||||
sampling_rate(Literal[100000, 500000, 1000000, 2000000, 4000000, 5000000,
|
||||
10000000, 14286000]): Sampling rate in Hz
|
||||
ai(list): List of analog input channel numbers to add, i.e. [0, 1, 2] for
|
||||
input 0, 1 and 2
|
||||
ci(list): List of counter input channel numbers to add, i.e. [0, 1, 2] for
|
||||
input 0, 1 and 2
|
||||
di(list): List of digital input channel numbers to add, i.e. [0, 1, 2] for
|
||||
input 0, 1 and 2
|
||||
scan_type(Literal['continuous', 'triggered']): Triggered to use with monochromator,
|
||||
otherwise continuous, default 'triggered'
|
||||
scan_duration(float): Scan duration in seconds, use 0 for infinite scan, default 0
|
||||
readout_range(Literal[1, 2, 5, 10]): Readout range in +- Volts, default +-10V
|
||||
encoder_type(Literal['X_1', 'X_2', 'X_4']): Encoder readout type, default 'X_4'
|
||||
enable_compression(bool): Enable or disable compression of data, default True
|
||||
|
||||
"""
|
||||
if sampling_rate == 100000:
|
||||
self.sampling_rate.put(ScanRates.HUNDRED_KHZ)
|
||||
elif sampling_rate == 500000:
|
||||
self.sampling_rate.put(ScanRates.FIVE_HUNDRED_KHZ)
|
||||
elif sampling_rate == 1000000:
|
||||
self.sampling_rate.put(ScanRates.ONE_MHZ)
|
||||
elif sampling_rate == 2000000:
|
||||
self.sampling_rate.put(ScanRates.TWO_MHZ)
|
||||
elif sampling_rate == 4000000:
|
||||
self.sampling_rate.put(ScanRates.FOUR_MHZ)
|
||||
elif sampling_rate == 5000000:
|
||||
self.sampling_rate.put(ScanRates.FIVE_MHZ)
|
||||
elif sampling_rate == 10000000:
|
||||
self.sampling_rate.put(ScanRates.TEN_MHZ)
|
||||
elif sampling_rate == 14286000:
|
||||
self.sampling_rate.put(ScanRates.FOURTEEN_THREE_MHZ)
|
||||
|
||||
ai_chans = 0
|
||||
if isinstance(ai, list):
|
||||
for ch in ai:
|
||||
if isinstance(ch, int):
|
||||
if ch >= 0 and ch <= 7:
|
||||
ai_chans = ai_chans | (1 << ch)
|
||||
self.ai_chans.put(ai_chans)
|
||||
|
||||
ci_chans = 0
|
||||
if isinstance(ci, list):
|
||||
for ch in ci:
|
||||
if isinstance(ch, int):
|
||||
if ch >= 0 and ch <= 7:
|
||||
ci_chans = ci_chans | (1 << ch)
|
||||
self.ci_chans.put(ci_chans)
|
||||
|
||||
di_chans = 0
|
||||
if isinstance(di, list):
|
||||
for ch in di:
|
||||
if isinstance(ch, int):
|
||||
if ch >= 0 and ch <= 4:
|
||||
di_chans = di_chans | (1 << ch)
|
||||
self.di_chans.put(di_chans)
|
||||
|
||||
if scan_type in "continuous":
|
||||
self.scan_type.put(ScanType.CONTINUOUS)
|
||||
elif scan_type in "triggered":
|
||||
self.scan_type.put(ScanType.TRIGGERED)
|
||||
|
||||
if scan_duration >= 0:
|
||||
self.scan_duration.put(scan_duration)
|
||||
|
||||
if readout_range == 1:
|
||||
self.readout_range.put(ReadoutRange.ONE_V)
|
||||
elif readout_range == 2:
|
||||
self.readout_range.put(ReadoutRange.TWO_V)
|
||||
elif readout_range == 5:
|
||||
self.readout_range.put(ReadoutRange.FIVE_V)
|
||||
elif readout_range == 10:
|
||||
self.readout_range.put(ReadoutRange.TEN_V)
|
||||
|
||||
if encoder_type in "1/16":
|
||||
self.encoder_factor.put(EncoderFactors.X1_16)
|
||||
elif encoder_type in "1/8":
|
||||
self.encoder_factor.put(EncoderFactors.X1_8)
|
||||
elif encoder_type in "1/4":
|
||||
self.encoder_factor.put(EncoderFactors.X1_4)
|
||||
elif encoder_type in "1/2":
|
||||
self.encoder_factor.put(EncoderFactors.X1_2)
|
||||
elif encoder_type in "1":
|
||||
self.encoder_factor.put(EncoderFactors.X1)
|
||||
elif encoder_type in "2":
|
||||
self.encoder_factor.put(EncoderFactors.X2)
|
||||
elif encoder_type in "4":
|
||||
self.encoder_factor.put(EncoderFactors.X4)
|
||||
|
||||
if enable_compression is True:
|
||||
self.enable_compression.put(NIDAQCompression.ON)
|
||||
elif enable_compression is False:
|
||||
self.enable_compression.put(NIDAQCompression.OFF)
|
||||
|
||||
########################################
|
||||
# Beamline Specific Implementations #
|
||||
########################################
|
||||
|
||||
def on_init(self) -> None:
|
||||
"""
|
||||
Called when the device is initialized.
|
||||
|
||||
No signals are connected at this point. If you like to
|
||||
set default values on signals, please use on_connected instead.
|
||||
"""
|
||||
|
||||
def on_connected(self) -> None:
|
||||
"""
|
||||
Called after the device is connected and its signals are connected.
|
||||
Default values for signals should be set here.
|
||||
"""
|
||||
status = TransitionStatus(self.heartbeat, transitions=[0, 1], strict=False)
|
||||
self.cancel_on_stop(status)
|
||||
try:
|
||||
status.wait(timeout=self.timeout_wait_for_signal) # Raises if timeout is reached
|
||||
except WaitTimeoutError:
|
||||
logger.warning(f"Device {self.name} was not alive, trying to put power on")
|
||||
status = TransitionStatus(self.heartbeat, transitions=[0, 1], strict=False)
|
||||
self.cancel_on_stop(status)
|
||||
self.power.put(1)
|
||||
|
||||
status.wait(timeout=self.timeout_wait_for_signal)
|
||||
|
||||
status = CompareStatus(self.state, NidaqState.STANDBY)
|
||||
self.cancel_on_stop(status)
|
||||
status.wait(timeout=self.timeout_wait_for_signal)
|
||||
self.scan_duration.set(0).wait(timeout=self._timeout_wait_for_pv)
|
||||
self.time_left.subscribe(self._progress_update, run=False)
|
||||
|
||||
def on_stage(self) -> DeviceStatus | StatusBase | None:
|
||||
"""
|
||||
Called while staging the device.
|
||||
|
||||
Information about the upcoming scan can be accessed from the scan_info (self.scan_info.msg) object.
|
||||
If the upcoming scan is not in the list of valid scans, return immediately.
|
||||
"""
|
||||
if not self._check_if_scan_name_is_valid():
|
||||
return None
|
||||
|
||||
if self.state.get() != NidaqState.STANDBY:
|
||||
status = CompareStatus(self.state, NidaqState.STANDBY)
|
||||
self.cancel_on_stop(status)
|
||||
self.on_stop()
|
||||
status.wait(timeout=self.timeout_wait_for_signal)
|
||||
|
||||
# If scan is not part of the valid_scan_names,
|
||||
if self.scan_info.msg.scan_name != "nidaq_continuous_scan":
|
||||
self.scan_type.set(ScanType.TRIGGERED).wait(timeout=self._timeout_wait_for_pv)
|
||||
self.scan_duration.set(0).wait(timeout=self._timeout_wait_for_pv)
|
||||
self.enable_compression.set(1).wait(timeout=self._timeout_wait_for_pv)
|
||||
else:
|
||||
self.scan_type.set(ScanType.CONTINUOUS).wait(timeout=self._timeout_wait_for_pv)
|
||||
self.scan_duration.set(self.scan_info.msg.scan_parameters["scan_duration"]).wait(
|
||||
timeout=self._timeout_wait_for_pv
|
||||
)
|
||||
self.enable_compression.set(self.scan_info.msg.scan_parameters["compression"]).wait(
|
||||
timeout=self._timeout_wait_for_pv
|
||||
)
|
||||
|
||||
# Stage call to IOC
|
||||
status = CompareStatus(self.state, NidaqState.STAGE)
|
||||
self.cancel_on_stop(status)
|
||||
# TODO 11.11.25/HS64
|
||||
# Switched from set to put in the hope to get rid of the rare event where nidaq is stopped at the start of a scan
|
||||
# Problems consistently persisting, testing changing back to set, unconvinced this is the actual cause 14.11.25/AHC
|
||||
# self.stage_call.set(1).wait(timeout=self._timeout_wait_for_pv)
|
||||
self.stage_call.put(1)
|
||||
status.wait(timeout=self.timeout_wait_for_signal)
|
||||
if self.scan_info.msg.scan_name != "nidaq_continuous_scan":
|
||||
status = self.on_kickoff()
|
||||
self.cancel_on_stop(status)
|
||||
status.wait(timeout=self._timeout_wait_for_pv)
|
||||
logger.info(f"Device {self.name} was staged: {NidaqState(self.state.get())}")
|
||||
|
||||
def on_kickoff(self) -> DeviceStatus | StatusBase:
|
||||
"""Kickoff the Nidaq"""
|
||||
status = self.kickoff_call.set(1)
|
||||
self.cancel_on_stop(status)
|
||||
return status
|
||||
|
||||
def on_unstage(self) -> DeviceStatus | StatusBase | None:
|
||||
"""Called while unstaging the device. Check that the Nidaq goes into Standby"""
|
||||
|
||||
status = CompareStatus(self.state, NidaqState.STANDBY)
|
||||
self.cancel_on_stop(status)
|
||||
status.wait(timeout=self.timeout_wait_for_signal)
|
||||
status = self.enable_compression.set(1)
|
||||
self.cancel_on_stop(status)
|
||||
status.wait(self._timeout_wait_for_pv)
|
||||
logger.info(f"Device {self.name} was unstaged: {NidaqState(self.state.get())}")
|
||||
|
||||
def on_pre_scan(self) -> DeviceStatus | StatusBase | None:
|
||||
"""
|
||||
Called right before the scan starts on all devices automatically.
|
||||
|
||||
Here we ensure that the NIDAQ master task is running
|
||||
before the motor starts its oscillation. This is needed for being properly homed.
|
||||
The NIDAQ should go into Acquiring mode.
|
||||
"""
|
||||
if not self._check_if_scan_name_is_valid():
|
||||
return None
|
||||
|
||||
if self.scan_info.msg.scan_name == "nidaq_continuous_scan":
|
||||
logger.info(f"Device {self.name} ready to be kicked off for nidaq_continuous_scan")
|
||||
return None
|
||||
|
||||
status = CompareStatus(self.state, NidaqState.KICKOFF)
|
||||
self.cancel_on_stop(status)
|
||||
status.wait(timeout=self._timeout_wait_for_pv)
|
||||
logger.info(
|
||||
f"Device {self.name} ready to take data after pre_scan: {NidaqState(self.state.get())}"
|
||||
)
|
||||
|
||||
def on_trigger(self) -> DeviceStatus | StatusBase | None:
|
||||
"""Called when the device is triggered."""
|
||||
|
||||
def on_complete(self) -> DeviceStatus | StatusBase | None:
|
||||
"""
|
||||
Called to inquire if a device has completed a scans.
|
||||
|
||||
For the NIDAQ we use this method to stop the backend since it
|
||||
would not stop by itself in its current implementation since the number of points are not predefined.
|
||||
"""
|
||||
if not self._check_if_scan_name_is_valid():
|
||||
return None
|
||||
|
||||
status = CompareStatus(self.state, NidaqState.STANDBY)
|
||||
self.cancel_on_stop(status)
|
||||
if self.scan_info.msg.scan_name != "nidaq_continuous_scan":
|
||||
self.on_stop()
|
||||
return status
|
||||
|
||||
def _progress_update(self, value, **kwargs) -> None:
|
||||
"""Callback method to update the scan progress, runs a callback
|
||||
to SUB_PROGRESS subscribers, i.e. BEC.
|
||||
|
||||
Args:
|
||||
value (int) : current progress value
|
||||
"""
|
||||
scan_duration = self.scan_info.msg.scan_parameters.get("scan_duration", None)
|
||||
if not isinstance(scan_duration, (int, float)):
|
||||
return
|
||||
value = scan_duration - value
|
||||
max_value = scan_duration
|
||||
self.progress_signal.put(value=value, max_value=max_value, done=bool(max_value == value))
|
||||
|
||||
def on_stop(self) -> None:
|
||||
"""Called when the device is stopped."""
|
||||
self.stop_call.put(1)
|
||||
@@ -0,0 +1,60 @@
|
||||
import enum
|
||||
|
||||
|
||||
class NIDAQCompression(str, enum.Enum):
|
||||
"""Options for Compression"""
|
||||
|
||||
OFF = 0
|
||||
ON = 1
|
||||
|
||||
|
||||
class ScanType(int, enum.Enum):
|
||||
"""Triggering options of the backend"""
|
||||
|
||||
TRIGGERED = 0
|
||||
CONTINUOUS = 1
|
||||
|
||||
|
||||
class NidaqState(int, enum.Enum):
|
||||
"""Possible States of the NIDAQ backend"""
|
||||
|
||||
DISABLED = 0
|
||||
STANDBY = 1
|
||||
STAGE = 2
|
||||
KICKOFF = 3
|
||||
ACQUIRE = 4
|
||||
UNSTAGE = 5
|
||||
|
||||
|
||||
class ScanRates(int, enum.Enum):
|
||||
"""Sampling Rate options for the backend, in kHZ and MHz"""
|
||||
|
||||
HUNDRED_KHZ = 0
|
||||
FIVE_HUNDRED_KHZ = 1
|
||||
ONE_MHZ = 2
|
||||
TWO_MHZ = 3
|
||||
FOUR_MHZ = 4
|
||||
FIVE_MHZ = 5
|
||||
TEN_MHZ = 6
|
||||
FOURTEEN_THREE_MHZ = 7
|
||||
|
||||
|
||||
class ReadoutRange(int, enum.Enum):
|
||||
"""ReadoutRange in +-V"""
|
||||
|
||||
ONE_V = 0
|
||||
TWO_V = 1
|
||||
FIVE_V = 2
|
||||
TEN_V = 3
|
||||
|
||||
|
||||
class EncoderFactors(int, enum.Enum):
|
||||
"""Encoder Factors"""
|
||||
|
||||
X1_16 = 0
|
||||
X1_8 = 1
|
||||
X1_4 = 2
|
||||
X1_2 = 3
|
||||
X1 = 4
|
||||
X2 = 5
|
||||
X4 = 6
|
||||
@@ -19,11 +19,14 @@ from bec_lib.file_utils import get_full_path
|
||||
from bec_lib.logger import bec_logger
|
||||
from ophyd import ADBase
|
||||
from ophyd import Component as Cpt
|
||||
from ophyd import EpicsSignalRO, EpicsSignalWithRBV
|
||||
from ophyd import Signal
|
||||
from ophyd_devices import (
|
||||
AsyncSignal,
|
||||
CompareStatus,
|
||||
DeviceStatus,
|
||||
EpicsSignalRO,
|
||||
EpicsSignalWithRBV,
|
||||
ExceptionStatus,
|
||||
FileEventSignal,
|
||||
PreviewSignal,
|
||||
StatusBase,
|
||||
@@ -134,6 +137,13 @@ class FILEWRITEMODE(int, enum.Enum):
|
||||
STREAM = 2
|
||||
|
||||
|
||||
class IMAGEMODE(int, enum.Enum):
|
||||
"""Values for ImageMode PV"""
|
||||
|
||||
SINGLE = 0
|
||||
MULTIPLE = 1
|
||||
|
||||
|
||||
def load_pixel_map_from_json(file_path: str) -> PixelMap:
|
||||
"""Load a pixel map from a JSON file.
|
||||
|
||||
@@ -198,6 +208,7 @@ class Timepix(PSIDeviceBase, TimePixControl):
|
||||
"set_pixel_map",
|
||||
"set_pixel_map_from_json_file",
|
||||
"set_enable_xes",
|
||||
"set_enable_image_writing",
|
||||
]
|
||||
|
||||
xes_data = Cpt(
|
||||
@@ -511,6 +522,15 @@ class Timepix(PSIDeviceBase, TimePixControl):
|
||||
"""
|
||||
self.enable_xes = enable
|
||||
|
||||
def set_enable_image_writing(self, enable: bool) -> None:
|
||||
"""
|
||||
Enable or disable image writing to file through the HDF5 plugin.
|
||||
|
||||
Args:
|
||||
enable (bool): Whether to enable image writing.
|
||||
"""
|
||||
self.hdf.enable.set(1 if enable else 0).wait(timeout=self._pv_timeout)
|
||||
|
||||
@property
|
||||
def enable_xes(self) -> bool:
|
||||
"""Get whether XES data acquisition is enabled."""
|
||||
@@ -610,6 +630,13 @@ class Timepix(PSIDeviceBase, TimePixControl):
|
||||
self.cam.tdc2_edge.set(TDCEdge.RISING).wait(timeout=self._pv_timeout)
|
||||
self.cam.tdc2_output.set(TDCOuput.ALL_CHANNELS).wait(timeout=self._pv_timeout)
|
||||
|
||||
def wait_for_connection(
|
||||
self, all_signals: bool = False, timeout: float | None = None, **kwargs
|
||||
):
|
||||
super().wait_for_connection(all_signals, timeout)
|
||||
# Prepare backend for TimePixFly
|
||||
self.backend.on_connected()
|
||||
|
||||
def on_connected(self) -> None:
|
||||
"""
|
||||
Called after the device is connected and its signals are connected.
|
||||
@@ -624,6 +651,8 @@ class Timepix(PSIDeviceBase, TimePixControl):
|
||||
self.cam.exposure_mode.set(EXPOSUREMODE.TIMED).wait(timeout=self._pv_timeout)
|
||||
# Reset array counter on connect
|
||||
self.cam.array_counter.set(0).wait(timeout=self._pv_timeout)
|
||||
# Set image mode to multiple
|
||||
self.cam.image_mode.set(IMAGEMODE.MULTIPLE).wait(timeout=self._pv_timeout)
|
||||
|
||||
# ------------------
|
||||
# Prepare file writing through AD HDF5 plugin
|
||||
@@ -639,8 +668,6 @@ class Timepix(PSIDeviceBase, TimePixControl):
|
||||
# Prepare TimePixFly backend
|
||||
# -----------------
|
||||
|
||||
# Prepare backend for TimePixFly
|
||||
self.backend.on_connected()
|
||||
# Register the callback for processing data received by the backend
|
||||
self.backend.add_callback(self.msg_buffer_callback)
|
||||
self._poll_thread.start()
|
||||
@@ -693,18 +720,18 @@ class Timepix(PSIDeviceBase, TimePixControl):
|
||||
file_path = "/".join(self._full_path.split("/")[:-1])
|
||||
file_name = self._full_path.split("/")[-1]
|
||||
self.cam.array_callbacks.set(1).wait(5) # Enable array callbacks
|
||||
# self.hdf.enable.set(1).wait(5) # Enable HDF5 plugin
|
||||
self.hdf.file_path.set(file_path).wait(5)
|
||||
self.hdf.file_name.set(file_name).wait(5)
|
||||
# Setup file writing for the total expected number of images
|
||||
self.hdf.num_capture.set(self._n_images).wait(5)
|
||||
self.hdf.capture.put(1)
|
||||
self.file_event.put(
|
||||
file_path=self._full_path,
|
||||
done=False,
|
||||
successful=False,
|
||||
hinted_h5_entries={"data": "/entry/data/data"},
|
||||
)
|
||||
if self.hdf.enable.get() == "Enable":
|
||||
self.hdf.capture.put(1)
|
||||
self.file_event.put(
|
||||
file_path=self._full_path,
|
||||
done=False,
|
||||
successful=False,
|
||||
hinted_h5_entries={"data": "/entry/data/data"},
|
||||
)
|
||||
|
||||
# -------------------------
|
||||
# XES specific staging
|
||||
@@ -740,11 +767,15 @@ class Timepix(PSIDeviceBase, TimePixControl):
|
||||
status_camera = CompareStatus(
|
||||
self.cam.acquire_busy, ACQUIRESTATUS.DONE, timeout=self._pv_timeout
|
||||
)
|
||||
status_writer = CompareStatus(
|
||||
self.hdf.capture, ACQUIRESTATUS.ACQUIRING, timeout=self._pv_timeout
|
||||
)
|
||||
status = status_camera & status_writer
|
||||
|
||||
status_writer = None
|
||||
if self.hdf.enable.get() == "Enable":
|
||||
status_writer = CompareStatus(
|
||||
self.hdf.capture, ACQUIRESTATUS.ACQUIRING, timeout=self._pv_timeout
|
||||
)
|
||||
if status_writer:
|
||||
status = status_camera & status_writer
|
||||
else:
|
||||
status = status_camera
|
||||
self.cancel_on_stop(status)
|
||||
return status
|
||||
|
||||
@@ -799,12 +830,15 @@ class Timepix(PSIDeviceBase, TimePixControl):
|
||||
def on_complete(self) -> DeviceStatus | StatusBase | None:
|
||||
"""Called to inquire if a device has completed a scans."""
|
||||
# Status Camera
|
||||
status_camera = CompareStatus(self.cam.acquire_busy, ACQUIRESTATUS.DONE)
|
||||
return_status = CompareStatus(self.cam.acquire_busy, ACQUIRESTATUS.DONE)
|
||||
# Status Writer
|
||||
st1 = CompareStatus(self.hdf.capture, ACQUIRESTATUS.DONE)
|
||||
st2 = CompareStatus(self.hdf.write_file, ACQUIRESTATUS.DONE)
|
||||
status_written_images = CompareStatus(self.hdf.num_captured, self._n_images)
|
||||
status_writer = st1 & st2 & status_written_images
|
||||
status_writer = None
|
||||
if self.hdf.enable.get() == "Enable":
|
||||
st2 = CompareStatus(self.hdf.write_file, ACQUIRESTATUS.DONE)
|
||||
st3 = ExceptionStatus(self.hdf.write_status, 0, operation="!=")
|
||||
status_written_images = CompareStatus(self.hdf.num_captured, self._n_images)
|
||||
status_writer = st1 & st2 & status_written_images & st3
|
||||
|
||||
# Status Backend
|
||||
status_backend = None
|
||||
@@ -813,9 +847,9 @@ class Timepix(PSIDeviceBase, TimePixControl):
|
||||
status_backend = self.backend.on_complete(status=status_backend)
|
||||
# Combine the statuses
|
||||
if status_backend is not None:
|
||||
return_status = status_backend & status_camera & status_writer
|
||||
else:
|
||||
return_status = status_camera & status_writer
|
||||
return_status = status_backend & return_status
|
||||
if status_writer is not None:
|
||||
return_status = return_status & status_writer
|
||||
|
||||
return_status.add_callback(self._complete_callback)
|
||||
self.cancel_on_stop(return_status)
|
||||
@@ -823,6 +857,10 @@ class Timepix(PSIDeviceBase, TimePixControl):
|
||||
|
||||
def _complete_callback(self, status: CompareStatus) -> None:
|
||||
"""Callback for when the device completes a scan."""
|
||||
if (
|
||||
self.hdf.enable.get() == "Enable"
|
||||
): # TODO: Not sure if we should support disabled file writing.
|
||||
return
|
||||
if status.success:
|
||||
self.file_event.put(
|
||||
file_path=self._full_path, # pylint: disable:protected-access
|
||||
@@ -849,11 +887,14 @@ class Timepix(PSIDeviceBase, TimePixControl):
|
||||
|
||||
def on_destroy(self):
|
||||
"""Cleanup method to stop the device and clean up resources."""
|
||||
self.cam.acquire.put(0)
|
||||
self.hdf.capture.put(0)
|
||||
self._poll_thread_kill_event.set()
|
||||
self.backend.on_stop()
|
||||
self.backend.on_destroy()
|
||||
try:
|
||||
self.cam.acquire.put(0)
|
||||
self.hdf.capture.put(0)
|
||||
self._poll_thread_kill_event.set()
|
||||
self.backend.on_stop()
|
||||
self.backend.on_destroy()
|
||||
except Exception:
|
||||
logger.warning(f"Failed to destroy {self.name}.")
|
||||
|
||||
|
||||
# pylint: disable=protected-access
|
||||
|
||||
@@ -12,7 +12,6 @@ hooks for all the relevant ophyd interface, 'on_stage',
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import signal
|
||||
import socket
|
||||
import threading
|
||||
import time
|
||||
@@ -81,14 +80,14 @@ class TimepixFlyBackend:
|
||||
###### Hooks for the PSIDeviceBase interface ######
|
||||
###################################################
|
||||
|
||||
def on_connected(self):
|
||||
def on_connected(self, timeout: float = 10):
|
||||
"""Called if it is ensured that the device is connected."""
|
||||
time_started = time.time()
|
||||
logger.info("Connecting to Timepix Fly backend...")
|
||||
try:
|
||||
self.timepix_fly_client.on_connected()
|
||||
self.timepix_fly_client.on_connected(timeout=timeout / 2)
|
||||
status = self.start_data_server()
|
||||
status.wait(timeout=5)
|
||||
status.wait(timeout=timeout / 2)
|
||||
except Exception: # pylint: disable=broad-except
|
||||
content = traceback.format_exc()
|
||||
logger.error(f"Error starting data server: {content}")
|
||||
@@ -239,6 +238,7 @@ class TimepixFlyBackend:
|
||||
|
||||
def on_destroy(self):
|
||||
"""Hook for on_destroy logic."""
|
||||
time_started = time.time()
|
||||
self.timepix_fly_client.shutdown()
|
||||
self._data_thread_shutdown_event.set()
|
||||
if self._data_thread is not None and self._data_thread.is_alive():
|
||||
@@ -255,6 +255,9 @@ class TimepixFlyBackend:
|
||||
except Exception:
|
||||
content = traceback.format_exc()
|
||||
logger.error(f"Error closing socket server: {content}")
|
||||
logger.info(
|
||||
f"Timepix Fly backend destroyed and resources cleaned up after {time.time() - time_started:.3f} seconds."
|
||||
)
|
||||
|
||||
def on_stop(self):
|
||||
"""Hook for on_stop logic."""
|
||||
|
||||
@@ -88,7 +88,7 @@ class TimepixFlyClient:
|
||||
### Utility Methods ###
|
||||
#############################
|
||||
|
||||
def on_connected(self) -> None:
|
||||
def on_connected(self, timeout: float = 5) -> None:
|
||||
"""
|
||||
Called when the client is connected to the TimePix server.
|
||||
This method can be overridden to perform actions when the client connects.
|
||||
@@ -96,7 +96,7 @@ class TimepixFlyClient:
|
||||
try:
|
||||
self.stop_running_collection()
|
||||
self.connect()
|
||||
self.wait_for_connection(timeout=5)
|
||||
self.wait_for_connection(timeout=timeout)
|
||||
|
||||
except Exception:
|
||||
content = traceback.format_exc()
|
||||
@@ -270,16 +270,18 @@ class TimepixFlyClient:
|
||||
continue
|
||||
if status in success:
|
||||
dev_status.set_finished()
|
||||
logger.debug(f"Status callback finished in succes: {status.value}")
|
||||
logger.debug(f"Status callback finished in success: {status.value}")
|
||||
self._status_callbacks.pop(cb_id)
|
||||
elif status in error:
|
||||
try:
|
||||
last_error = self.last_error()
|
||||
raise TimePixStatusError(
|
||||
f"TimePixFly Backend state '{status.value}' is in list of specified errors {error}. Last error message: {last_error.message}"
|
||||
f"TimePixFly state '{status.value}': {last_error.message}"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Error in status callback from TimepixFly Backend: {e}")
|
||||
logger.error(
|
||||
f"Error in status callback for '{status.value}' from TimepixFly backend: {e}"
|
||||
)
|
||||
dev_status.set_exception(e)
|
||||
self._status_callbacks.pop(cb_id)
|
||||
# Reset the _started flag if the status is in CONFIG.
|
||||
|
||||
@@ -0,0 +1 @@
|
||||
from .superxas_nexus_structure import SuperXASNexusStructure
|
||||
@@ -0,0 +1,308 @@
|
||||
from bec_server.file_writer.default_writer import DefaultFormat
|
||||
|
||||
import superxas_bec.bec_widgets.widgets.x10da_parameters as bl
|
||||
|
||||
class SuperXASNexusStructure(DefaultFormat):
|
||||
"""Nexus Structure for SuperXAS"""
|
||||
|
||||
def format(self) -> None:
|
||||
"""Specify the file format for the file writer."""
|
||||
|
||||
entry = self.storage.create_group(name="entry")
|
||||
entry.attrs["NX_class"] = "NXentry"
|
||||
instrument = entry.create_group(name="instrument")
|
||||
instrument.attrs["NX_class"] = "NXinstrument"
|
||||
|
||||
##################
|
||||
## source specific information
|
||||
###################
|
||||
|
||||
source = instrument.create_group(name="source")
|
||||
source.attrs["NX_class"] = "NXsource"
|
||||
|
||||
beamline_name = source.create_dataset(name="beamline_name", data="SuperXAS")
|
||||
beamline_name.attrs["NX_class"] = "NX_CHAR"
|
||||
|
||||
facility_name = source.create_dataset(name="facility_name", data="Swiss Light Source")
|
||||
facility_name.attrs["NX_class"] = "NX_CHAR"
|
||||
|
||||
probe = source.create_dataset(name="probe", data="X-ray")
|
||||
probe.attrs["NX_class"] = "NX_CHAR"
|
||||
|
||||
if "curr" in self.device_manager.devices:
|
||||
ring_current = source.create_soft_link(
|
||||
name="ring_current",
|
||||
target="/entry/collection/devices/curr/curr/value",
|
||||
)
|
||||
ring_current.attrs["NX_class"] = "NX_FLOAT"
|
||||
ring_current.attrs["units"] = "mA"
|
||||
|
||||
###################
|
||||
## mo1_bragg specific information
|
||||
###################
|
||||
|
||||
## Logic if device exist
|
||||
if "mo1_bragg" in self.device_manager.devices:
|
||||
|
||||
monochromator = instrument.create_group(name="monochromator")
|
||||
monochromator.attrs["NX_class"] = "NXmonochromator"
|
||||
crystal = monochromator.create_group(name="crystal")
|
||||
crystal.attrs["NX_class"] = "NXcrystal"
|
||||
|
||||
# Create a dataset
|
||||
chemical_formular = crystal.create_dataset(name="chemical_formular", data="Si")
|
||||
chemical_formular.attrs["NX_class"] = "NX_CHAR"
|
||||
|
||||
reflection = crystal.create_soft_link(
|
||||
name="reflection",
|
||||
target="/entry/collection/devices/mo1_bragg/mo1_bragg_crystal_current_xtal_string/value",
|
||||
)
|
||||
reflection.attrs["NX_class"] = "NX_CHAR"
|
||||
|
||||
# Create a softlink
|
||||
d_spacing = crystal.create_soft_link(
|
||||
name="d_spacing",
|
||||
target="/entry/collection/devices/mo1_bragg/mo1_bragg_crystal_current_d_spacing/value",
|
||||
)
|
||||
d_spacing.attrs["NX_class"] = "NX_FLOAT"
|
||||
d_spacing.attrs["units"] = "angstrom"
|
||||
|
||||
bragg_offset = crystal.create_soft_link(
|
||||
name="bragg_offset",
|
||||
target="/entry/collection/devices/mo1_bragg/mo1_bragg_crystal_current_bragg_off/value",
|
||||
)
|
||||
bragg_offset.attrs["NX_class"] = "NX_FLOAT"
|
||||
bragg_offset.attrs["units"] = "degree"
|
||||
|
||||
###################
|
||||
### cm mirror specific information
|
||||
####################
|
||||
|
||||
collimating_mirror = instrument.create_group(name="collimating_mirror")
|
||||
collimating_mirror.attrs["NX_class"] = "NXmirror"
|
||||
|
||||
cm_substrate_material = collimating_mirror.create_dataset(
|
||||
name="substrate_material", data="Si"
|
||||
)
|
||||
cm_substrate_material.attrs["NX_class"] = "NX_CHAR"
|
||||
|
||||
#previous error due to space in name field
|
||||
|
||||
if "cm_bnd" in self.device_manager.devices:
|
||||
cm_bending = collimating_mirror.create_soft_link(
|
||||
name="sagittal_radius_bender_motor",
|
||||
target="/entry/collection/devices/cm_bnd/cm_bnd/value",
|
||||
)
|
||||
cm_bending.attrs["NX_class"] = "NX_FLOAT"
|
||||
cm_bending.attrs["units"] = "steps"
|
||||
|
||||
if "cm_rotx" in self.device_manager.devices:
|
||||
cm_incidence_angle = collimating_mirror.create_soft_link(
|
||||
name="incidence_angle", target="/entry/collection/devices/cm_rotx/cm_rotx/value"
|
||||
)
|
||||
cm_incidence_angle.attrs["NX_class"] = "NX_FLOAT"
|
||||
cm_incidence_angle.attrs["units"] = "mrad"
|
||||
|
||||
if "cm_roty" in self.device_manager.devices:
|
||||
cm_yaw_angle = collimating_mirror.create_soft_link(
|
||||
name="yaw_angle", target="/entry/collection/devices/cm_roty/cm_roty/value"
|
||||
)
|
||||
cm_yaw_angle.attrs["NX_class"] = "NX_FLOAT"
|
||||
cm_yaw_angle.attrs["units"] = "mrad"
|
||||
|
||||
if "cm_rotz" in self.device_manager.devices:
|
||||
cm_roll_angle = collimating_mirror.create_soft_link(
|
||||
name="roll_angle", target="/entry/collection/devices/cm_rotz/cm_rotz/value"
|
||||
)
|
||||
cm_roll_angle.attrs["NX_class"] = "NX_FLOAT"
|
||||
cm_roll_angle.attrs["units"] = "mrad"
|
||||
|
||||
if 'cm_trx' in self.device_manager.devices:
|
||||
cm_trx = - self.device_manager.devices.cm_trx.read(cached=True).get('cm_trx').get('value')
|
||||
stripe = 'Unknown'
|
||||
for name, low, high in zip(bl.cm.surface, bl.cm.limOptX[0], bl.cm.limOptX[1]):
|
||||
if low <= cm_trx <= high:
|
||||
stripe = name
|
||||
cm_stripe = collimating_mirror.create_dataset(
|
||||
name="stripe", data=stripe
|
||||
)
|
||||
cm_stripe.attrs["NX_class"] = "NX_CHAR"
|
||||
|
||||
###################
|
||||
### fm mirror specific information
|
||||
####################
|
||||
|
||||
focusing_mirror = instrument.create_group(name="focusing_mirror")
|
||||
focusing_mirror.attrs["NX_class"] = "NXmirror"
|
||||
|
||||
fm_substrate_material = focusing_mirror.create_dataset(
|
||||
name="substrate_material", data="Si"
|
||||
)
|
||||
fm_substrate_material.attrs["NX_class"] = "NX_CHAR"
|
||||
|
||||
if "fm_bnd" in self.device_manager.devices:
|
||||
fm_bending = focusing_mirror.create_soft_link(
|
||||
name="sagittal_radius_bender_motor",
|
||||
target="/entry/collection/devices/fm_bnd/fm_bnd/value",
|
||||
)
|
||||
fm_bending.attrs["NX_class"] = "NX_FLOAT"
|
||||
fm_bending.attrs["units"] = "steps"
|
||||
|
||||
if "fm_rotx" in self.device_manager.devices:
|
||||
fm_incidence_angle = focusing_mirror.create_soft_link(
|
||||
name="incidence_angle", target="/entry/collection/devices/fm_rotx/fm_rotx/value"
|
||||
)
|
||||
fm_incidence_angle.attrs["NX_class"] = "NX_FLOAT"
|
||||
fm_incidence_angle.attrs["units"] = "mrad"
|
||||
|
||||
if "fm_roty" in self.device_manager.devices:
|
||||
fm_yaw_angle = focusing_mirror.create_soft_link(
|
||||
name="yaw_angle", target="/entry/collection/devices/fm_roty/fm_roty/value"
|
||||
)
|
||||
fm_yaw_angle.attrs["NX_class"] = "NX_FLOAT"
|
||||
fm_yaw_angle.attrs["units"] = "mrad"
|
||||
|
||||
if "fm_rotz" in self.device_manager.devices:
|
||||
fm_roll_angle = focusing_mirror.create_soft_link(
|
||||
name="roll_angle", target="/entry/collection/devices/fm_rotz/fm_rotz/value"
|
||||
)
|
||||
fm_roll_angle.attrs["NX_class"] = "NX_FLOAT"
|
||||
fm_roll_angle.attrs["units"] = "mrad"
|
||||
|
||||
if 'fm_trx' in self.device_manager.devices:
|
||||
fm_trx = - self.device_manager.devices.fm_trx.read(cached=True).get('fm_trx').get('value')
|
||||
stripe = 'Unknown'
|
||||
for name, low, high in zip(bl.fm.surfaceToroid, bl.fm.limOptXToroid[1], bl.fm.limOptXToroid[0]):
|
||||
if low <= fm_trx <= high:
|
||||
stripe = name + ' (toroid)'
|
||||
fm_stripe = focusing_mirror.create_dataset(
|
||||
name="stripe", data=stripe
|
||||
)
|
||||
fm_stripe.attrs["NX_class"] = "NX_CHAR"
|
||||
|
||||
###################
|
||||
## nidaq specific information
|
||||
###################
|
||||
|
||||
## Logic if device exist
|
||||
if "nidaq" in self.device_manager.devices:
|
||||
|
||||
#ai_chans_bits = self.device_manager.devices.nidaq.ai_chans.read(cached=True).get("nidaq_ai_chans").get("value")
|
||||
ai_chans_bits = self.configuration.get("nidaq", {}).get("nidaq_ai_chans", {}).get("value")
|
||||
ci_chans_bits = self.configuration.get("nidaq", {}).get("nidaq_ci_chans", {}).get("value")
|
||||
#add_chans_bits = self.device_manager.devices.nidaq.add_chans.read(cached=True).get("nidaq_add_chans").get("value")
|
||||
add_chans_bits = self.configuration.get("nidaq", {}).get("nidaq_add_chans", {}).get("value")
|
||||
|
||||
measurement_mode = entry.create_group(name="mode")
|
||||
measurement_mode.attrs["NX_class"] = "NX_CHAR"
|
||||
|
||||
if (int(ci_chans_bits) & 0x7F) != 0:
|
||||
# Create a dataset
|
||||
rayspec_sdd_active = measurement_mode.create_group(name="Multi_Element_Partial_Fluorescence_Yield")
|
||||
me_sdd = rayspec_sdd_active.create_dataset(name="Detector", data="Rayspec 7 element Silicon Drift Detector")
|
||||
me_sdd.attrs["NX_class"] = "NX_CHAR"
|
||||
|
||||
if (int(ci_chans_bits) & (1<<8)) != 0:
|
||||
# Create a dataset
|
||||
ketek_sdd_active = measurement_mode.create_group(name="Single_Element_Partial_Fluorescence_Yield")
|
||||
se_sdd = ketek_sdd_active.create_dataset(name="Detector", data="Ketex mini single element Silicon Drift Detector")
|
||||
se_sdd.attrs["NX_class"] = "NX_CHAR"
|
||||
|
||||
if ((int(ai_chans_bits) & (1<<6)) != 0):
|
||||
# Create a dataset
|
||||
pips_active = measurement_mode.create_group(name="Total_Flourescence_Yield")
|
||||
tfy = pips_active.create_dataset(name="Detector", data="Mirion Technologies Partially Depeleted PIPS Detector")
|
||||
tfy.attrs["NX_class"] = "NX_CHAR"
|
||||
|
||||
if ((int(ai_chans_bits) & (1<<0)) != 0) & ((int(ai_chans_bits) & (1<<2)) != 0):
|
||||
# Create a dataset
|
||||
ai0ai2_active = measurement_mode.create_group(name="Sample_Transmission")
|
||||
sam_trans = ai0ai2_active.create_dataset(name="Detector", data="Ionitec 15 cm gas filled Ionisation Chambers")
|
||||
sam_trans.attrs["NX_class"] = "NX_CHAR"
|
||||
|
||||
if ((int(ai_chans_bits) & (1<<2)) != 0) & ((int(ai_chans_bits) & (1<<4)) != 0):
|
||||
# Create a dataset
|
||||
ai2ai4_active = measurement_mode.create_group(name="Reference_Transmission")
|
||||
ref_trans = ai2ai4_active.create_dataset(name="Detector", data="Ionitec 15 cm gas filled Ionisation Chambers")
|
||||
ref_trans.attrs["NX_class"] = "NX_CHAR"
|
||||
|
||||
main_data = entry.create_group(name="data")
|
||||
main_data.attrs["NX_class"] = "NXdata"
|
||||
|
||||
##################
|
||||
## energy, test whether the signal exists. how to check from config?
|
||||
###################
|
||||
|
||||
energy = main_data.create_group(name="energy")
|
||||
energy.attrs["NX_class"] = "NXdata"
|
||||
energy.attrs["units"] = "eV"
|
||||
|
||||
main_data.create_soft_link(name="energy", target="/entry/collection/readout_groups/async/nidaq/nidaq_energy/value")
|
||||
|
||||
##################
|
||||
## i0
|
||||
###################
|
||||
|
||||
if (int(ai_chans_bits) & (1<<0)) !=0:
|
||||
i0 = main_data.create_group(name="i0")
|
||||
i0.attrs["NX_class"] = "NXdata"
|
||||
i0.attrs["units"] = "V"
|
||||
|
||||
main_data.create_soft_link(name="i0", target="/entry/collection/readout_groups/async/nidaq/nidaq_ai0_mean/value")
|
||||
|
||||
##################
|
||||
## i1
|
||||
###################
|
||||
|
||||
if (int(ai_chans_bits) & (1<<2)) !=0:
|
||||
i1 = main_data.create_group(name="i1")
|
||||
i1.attrs["NX_class"] = "NXdata"
|
||||
i1.attrs["units"] = "V"
|
||||
|
||||
main_data.create_soft_link(name="i1", target="/entry/collection/readout_groups/async/nidaq/nidaq_ai2_mean/value")
|
||||
|
||||
##################
|
||||
## i2
|
||||
###################
|
||||
|
||||
if (int(ai_chans_bits) & (1<<4)) !=0:
|
||||
i2 = main_data.create_group(name="i2")
|
||||
i2.attrs["NX_class"] = "NXdata"
|
||||
i2.attrs["units"] = "V"
|
||||
|
||||
main_data.create_soft_link(name="i2", target="/entry/collection/readout_groups/async/nidaq/nidaq_ai4_mean/value")
|
||||
|
||||
##################
|
||||
## ci sum
|
||||
###################
|
||||
|
||||
if int(ci_chans_bits) > 0:
|
||||
ci_sum = main_data.create_group(name="Fluorescence_Sum")
|
||||
ci_sum.attrs["NX_class"] = "NXdata"
|
||||
ci_sum.attrs["units"] = "counts"
|
||||
|
||||
main_data.create_soft_link(name="Fluorescence_Sum", target="/entry/collection/readout_groups/async/nidaq/nidaq_cisum/value")
|
||||
|
||||
##################
|
||||
## mu sample, test whether the signal exists. how to check from config?
|
||||
###################
|
||||
|
||||
if (int(add_chans_bits) & (1<<0)) !=0:
|
||||
mu_sample = main_data.create_group(name="mu_sample")
|
||||
mu_sample.attrs["NX_class"] = "NXdata"
|
||||
|
||||
main_data.create_soft_link(name="mu_sample", target="/entry/collection/readout_groups/async/nidaq/nidaq_smpl_abs/value")
|
||||
|
||||
##################
|
||||
## mu reference, test whether the signal exists. how to check from config?
|
||||
###################
|
||||
|
||||
if (int(add_chans_bits) & (1<<1)) !=0:
|
||||
mu_reference = main_data.create_group(name="mu_reference")
|
||||
mu_reference.attrs["NX_class"] = "NXdata"
|
||||
|
||||
main_data.create_soft_link(name="mu_reference", target="/entry/collection/readout_groups/async/nidaq/nidaq_ref_abs/value")
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -1 +1,9 @@
|
||||
from .exafs_scan import EXAFSScan
|
||||
from .exafs_scan import EXAFSScan
|
||||
|
||||
from .mono_bragg_scans import (
|
||||
XASAdvancedScan,
|
||||
# XASAdvancedScanWithXRD,
|
||||
XASSimpleScan,
|
||||
# XASSimpleScanWithXRD,
|
||||
)
|
||||
from .nidaq_cont_scan import NIDAQContinuousScan
|
||||
@@ -1,6 +1,6 @@
|
||||
# from .metadata_schema_template import ExampleSchema
|
||||
# from .metadata_schema_xas_simple_scan import xas_simple_scan_schema
|
||||
|
||||
METADATA_SCHEMA_REGISTRY = {
|
||||
METADATA_SCHEMA_REGISTRY = { # "xas_simple_scan": xas_simple_scan_schema
|
||||
# Add models which should be used to validate scan metadata here.
|
||||
# Make a model according to the template, and import it as above
|
||||
# Then associate it with a scan like so:
|
||||
@@ -9,4 +9,4 @@ METADATA_SCHEMA_REGISTRY = {
|
||||
|
||||
# Define a default schema type which should be used as the fallback for everything:
|
||||
|
||||
DEFAULT_SCHEMA = None
|
||||
DEFAULT_SCHEMA = None
|
||||
@@ -0,0 +1,8 @@
|
||||
from bec_lib.metadata_schema import BasicScanMetadata
|
||||
|
||||
|
||||
#
|
||||
#
|
||||
class xas_simple_scan_schema(BasicScanMetadata):
|
||||
Edge: str
|
||||
Element: str
|
||||
@@ -0,0 +1,338 @@
|
||||
"""This module contains the scan classes for the mono bragg motor of the SuperXAS beamline."""
|
||||
|
||||
import time
|
||||
from typing import Literal
|
||||
|
||||
import numpy as np
|
||||
from bec_lib.device import DeviceBase
|
||||
from bec_lib.logger import bec_logger
|
||||
from bec_server.scan_server.scans import AsyncFlyScanBase
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
|
||||
class XASSimpleScan(AsyncFlyScanBase):
|
||||
"""Class for the XAS simple scan"""
|
||||
|
||||
scan_name = "xas_simple_scan"
|
||||
scan_type = "fly"
|
||||
scan_report_hint = "device_progress"
|
||||
required_kwargs = []
|
||||
use_scan_progress_report = False
|
||||
pre_move = False
|
||||
gui_config = {
|
||||
"Movement Parameters": ["start", "stop"],
|
||||
"Scan Parameters": ["scan_time", "scan_duration"],
|
||||
}
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
start: float,
|
||||
stop: float,
|
||||
scan_time: float,
|
||||
scan_duration: float,
|
||||
motor: DeviceBase = "mo1_bragg",
|
||||
**kwargs,
|
||||
):
|
||||
"""The xas_simple_scan is used to start a simple oscillating scan on the mono bragg motor.
|
||||
Start and Stop define the energy range for the scan, scan_time is the time for one scan
|
||||
cycle and scan_duration is the duration of the scan. If scan duration is set to 0, the
|
||||
scan will run infinitely.
|
||||
|
||||
Args:
|
||||
start (float): Start energy for the scan.
|
||||
stop (float): Stop energy for the scan.
|
||||
scan_time (float): Time for one scan cycle.
|
||||
scan_duration (float): Duration of the scan.
|
||||
motor (DeviceBase, optional): Motor device to be used for the scan.
|
||||
Defaults to "mo1_bragg".
|
||||
Examples:
|
||||
>>> scans.xas_simple_scan(start=8000, stop=9000, scan_time=1, scan_duration=10)
|
||||
"""
|
||||
super().__init__(**kwargs)
|
||||
self.motor = motor
|
||||
self.start = start
|
||||
self.stop = stop
|
||||
self.scan_time = scan_time
|
||||
self.scan_duration = scan_duration
|
||||
self.primary_readout_cycle = 1
|
||||
|
||||
def stage(self):
|
||||
"""call the stage procedure"""
|
||||
# Compute position for mo1_gonio pre move
|
||||
# Since energy is not linear to angle, we have to calculate the angles first.
|
||||
pos_start = yield from self.stubs.send_rpc_and_wait(
|
||||
"mo1_bragg",
|
||||
"convert_angle_energy",
|
||||
mode = "EnergyToAngle",
|
||||
inp = self.start,
|
||||
)
|
||||
pos_end = yield from self.stubs.send_rpc_and_wait(
|
||||
"mo1_bragg",
|
||||
"convert_angle_energy",
|
||||
mode = "EnergyToAngle",
|
||||
inp = self.stop,
|
||||
)
|
||||
# Goniometer position is in the middle of the start and stop angle of the scan
|
||||
pos = (pos_start + pos_end) / 2
|
||||
|
||||
# Premove with mo1_gonio
|
||||
yield from self.stubs.send_rpc_and_wait(
|
||||
"mo1_gonio",
|
||||
"move",
|
||||
position = pos,
|
||||
wait = True,
|
||||
timeout = 30, # 30 seconds timeout
|
||||
)
|
||||
# Continue with staging the devices
|
||||
yield from self.stubs.stage()
|
||||
|
||||
def update_readout_priority(self):
|
||||
"""Ensure that NIDAQ is not monitored for any quick EXAFS."""
|
||||
super().update_readout_priority()
|
||||
self.readout_priority["async"].append("nidaq")
|
||||
|
||||
def prepare_positions(self):
|
||||
"""Prepare the positions for the scan.
|
||||
|
||||
Use here only start and end energy defining the range for the scan.
|
||||
"""
|
||||
self.positions = np.array([self.start, self.stop], dtype=float)
|
||||
self.num_pos = None
|
||||
yield None
|
||||
|
||||
def pre_scan(self):
|
||||
"""Pre Scan action."""
|
||||
|
||||
self._check_limits()
|
||||
# Ensure parent class pre_scan actions to be called.
|
||||
yield from super().pre_scan()
|
||||
|
||||
def scan_report_instructions(self):
|
||||
"""
|
||||
Return the instructions for the scan report.
|
||||
"""
|
||||
yield from self.stubs.scan_report_instruction({"device_progress": [self.motor]})
|
||||
|
||||
def scan_core(self):
|
||||
"""Run the scan core.
|
||||
Kickoff the oscillation on the Bragg motor and wait for the completion of the motion.
|
||||
"""
|
||||
# Start the oscillation on the Bragg motor.
|
||||
yield from self.stubs.kickoff(device=self.motor)
|
||||
complete_status = yield from self.stubs.complete(device=self.motor, wait=False)
|
||||
|
||||
while not complete_status.done:
|
||||
# Readout monitored devices
|
||||
yield from self.stubs.read(group="monitored", point_id=self.point_id)
|
||||
time.sleep(self.primary_readout_cycle)
|
||||
self.point_id += 1
|
||||
|
||||
self.num_pos = self.point_id
|
||||
|
||||
|
||||
# class XASSimpleScanWithXRD(XASSimpleScan):
|
||||
# """Class for the XAS simple scan with XRD"""
|
||||
|
||||
# scan_name = "xas_simple_scan_with_xrd"
|
||||
# gui_config = {
|
||||
# "Movement Parameters": ["start", "stop"],
|
||||
# "Scan Parameters": ["scan_time", "scan_duration"],
|
||||
# "Low Energy Range": ["xrd_enable_low", "num_trigger_low", "exp_time_low", "cycle_low"],
|
||||
# "High Energy Range": ["xrd_enable_high", "num_trigger_high", "exp_time_high", "cycle_high"],
|
||||
# }
|
||||
|
||||
# def __init__(
|
||||
# self,
|
||||
# start: float,
|
||||
# stop: float,
|
||||
# scan_time: float,
|
||||
# scan_duration: float,
|
||||
# xrd_enable_low: bool,
|
||||
# num_trigger_low: int,
|
||||
# exp_time_low: float,
|
||||
# cycle_low: int,
|
||||
# xrd_enable_high: bool,
|
||||
# num_trigger_high: int,
|
||||
# exp_time_high: float,
|
||||
# cycle_high: float,
|
||||
# motor: DeviceBase = "mo1_bragg",
|
||||
# **kwargs,
|
||||
# ):
|
||||
# """The xas_simple_scan_with_xrd is an oscillation motion on the mono motor
|
||||
# with XRD triggering at low and high energy ranges.
|
||||
# If scan duration is set to 0, the scan will run infinitely.
|
||||
|
||||
# Args:
|
||||
# start (float): Start energy for the scan.
|
||||
# stop (float): Stop energy for the scan.
|
||||
# scan_time (float): Time for one oscillation .
|
||||
# scan_duration (float): Total duration of the scan.
|
||||
# xrd_enable_low (bool): Enable XRD triggering for the low energy range.
|
||||
# num_trigger_low (int): Number of triggers for the low energy range.
|
||||
# exp_time_low (float): Exposure time for the low energy range.
|
||||
# cycle_low (int): Specify how often the triggers should be considered,
|
||||
# every nth cycle for low
|
||||
# xrd_enable_high (bool): Enable XRD triggering for the high energy range.
|
||||
# num_trigger_high (int): Number of triggers for the high energy range.
|
||||
# exp_time_high (float): Exposure time for the high energy range.
|
||||
# cycle_high (int): Specify how often the triggers should be considered,
|
||||
# every nth cycle for high
|
||||
# motor (DeviceBase, optional): Motor device to be used for the scan.
|
||||
# Defaults to "mo1_bragg".
|
||||
|
||||
# Examples:
|
||||
# >>> scans.xas_simple_scan_with_xrd(start=8000, stop=9000, scan_time=1, scan_duration=10, xrd_enable_low=True, num_trigger_low=5, cycle_low=2, exp_time_low=100, xrd_enable_high=False, num_trigger_high=3, cycle_high=1, exp_time_high=1000)
|
||||
# """
|
||||
# super().__init__(
|
||||
# start=start,
|
||||
# stop=stop,
|
||||
# scan_time=scan_time,
|
||||
# scan_duration=scan_duration,
|
||||
# motor=motor,
|
||||
# **kwargs,
|
||||
# )
|
||||
# self.xrd_enable_low = xrd_enable_low
|
||||
# self.num_trigger_low = num_trigger_low
|
||||
# self.exp_time_low = exp_time_low
|
||||
# self.cycle_low = cycle_low
|
||||
# self.xrd_enable_high = xrd_enable_high
|
||||
# self.num_trigger_high = num_trigger_high
|
||||
# self.exp_time_high = exp_time_high
|
||||
# self.cycle_high = cycle_high
|
||||
|
||||
|
||||
class XASAdvancedScan(XASSimpleScan):
|
||||
"""Class for the XAS advanced scan"""
|
||||
|
||||
scan_name = "xas_advanced_scan"
|
||||
gui_config = {
|
||||
"Movement Parameters": ["start", "stop"],
|
||||
"Scan Parameters": ["scan_time", "scan_duration"],
|
||||
"Spline Parameters": ["p_kink", "e_kink"],
|
||||
}
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
start: float,
|
||||
stop: float,
|
||||
scan_time: float,
|
||||
scan_duration: float,
|
||||
p_kink: float,
|
||||
e_kink: float,
|
||||
motor: DeviceBase = "mo1_bragg",
|
||||
**kwargs,
|
||||
):
|
||||
"""The xas_advanced_scan is an oscillation motion on the mono motor.
|
||||
Start and Stop define the energy range for the scan, scan_time is the
|
||||
time for one scan cycle and scan_duration is the duration of the scan.
|
||||
If scan duration is set to 0, the scan will run infinitely.
|
||||
p_kink and e_kink add a kink to the motion profile to slow down in the
|
||||
exafs region of the scan.
|
||||
|
||||
Args:
|
||||
start (float): Start angle for the scan.
|
||||
stop (float): Stop angle for the scan.
|
||||
scan_time (float): Time for one oscillation .
|
||||
scan_duration (float): Total duration of the scan.
|
||||
p_kink (float): Position of the kink.
|
||||
e_kink (float): Energy of the kink.
|
||||
motor (DeviceBase, optional): Motor device to be used for the scan.
|
||||
Defaults to "mo1_bragg".
|
||||
|
||||
Examples:
|
||||
>>> scans.xas_advanced_scan(start=10000, stop=12000, scan_time=0.5, scan_duration=10, p_kink=50, e_kink=10500)
|
||||
"""
|
||||
super().__init__(
|
||||
start=start,
|
||||
stop=stop,
|
||||
scan_time=scan_time,
|
||||
scan_duration=scan_duration,
|
||||
motor=motor,
|
||||
**kwargs,
|
||||
)
|
||||
self.p_kink = p_kink
|
||||
self.e_kink = e_kink
|
||||
|
||||
|
||||
# class XASAdvancedScanWithXRD(XASAdvancedScan):
|
||||
# """Class for the XAS advanced scan with XRD"""
|
||||
|
||||
# scan_name = "xas_advanced_scan_with_xrd"
|
||||
# gui_config = {
|
||||
# "Movement Parameters": ["start", "stop"],
|
||||
# "Scan Parameters": ["scan_time", "scan_duration"],
|
||||
# "Spline Parameters": ["p_kink", "e_kink"],
|
||||
# "Low Energy Range": ["xrd_enable_low", "num_trigger_low", "exp_time_low", "cycle_low"],
|
||||
# "High Energy Range": ["xrd_enable_high", "num_trigger_high", "exp_time_high", "cycle_high"],
|
||||
# }
|
||||
|
||||
# def __init__(
|
||||
# self,
|
||||
# start: float,
|
||||
# stop: float,
|
||||
# scan_time: float,
|
||||
# scan_duration: float,
|
||||
# p_kink: float,
|
||||
# e_kink: float,
|
||||
# xrd_enable_low: bool,
|
||||
# num_trigger_low: int,
|
||||
# exp_time_low: float,
|
||||
# cycle_low: int,
|
||||
# xrd_enable_high: bool,
|
||||
# num_trigger_high: int,
|
||||
# exp_time_high: float,
|
||||
# cycle_high: float,
|
||||
# motor: DeviceBase = "mo1_bragg",
|
||||
# **kwargs,
|
||||
# ):
|
||||
# """The xas_advanced_scan is an oscillation motion on the mono motor
|
||||
# with XRD triggering at low and high energy ranges.
|
||||
# Start and Stop define the energy range for the scan, scan_time is the time for
|
||||
# one scan cycle and scan_duration is the duration of the scan. If scan duration
|
||||
# is set to 0, the scan will run infinitely. p_kink and e_kink add a kink to the
|
||||
# motion profile to slow down in the exafs region of the scan.
|
||||
|
||||
# Args:
|
||||
# start (float): Start angle for the scan.
|
||||
# stop (float): Stop angle for the scan.
|
||||
# scan_time (float): Time for one oscillation .
|
||||
# scan_duration (float): Total duration of the scan.
|
||||
# p_kink (float): Position of kink.
|
||||
# e_kink (float): Energy of the kink.
|
||||
# xrd_enable_low (bool): Enable XRD triggering for the low energy range.
|
||||
# num_trigger_low (int): Number of triggers for the low energy range.
|
||||
# exp_time_low (float): Exposure time for the low energy range.
|
||||
# cycle_low (int): Specify how often the triggers should be considered,
|
||||
# every nth cycle for low
|
||||
# xrd_enable_high (bool): Enable XRD triggering for the high energy range.
|
||||
# num_trigger_high (int): Number of triggers for the high energy range.
|
||||
# exp_time_high (float): Exposure time for the high energy range.
|
||||
# cycle_high (int): Specify how often the triggers should be considered,
|
||||
# every nth cycle for high
|
||||
# motor (DeviceBase, optional): Motor device to be used for the scan.
|
||||
# Defaults to "mo1_bragg".
|
||||
|
||||
# Examples:
|
||||
# >>> scans.xas_advanced_scan_with_xrd(start=10000, stop=12000, scan_time=0.5, scan_duration=10, p_kink=50, e_kink=10500, xrd_enable_low=True, num_trigger_low=5, cycle_low=2, exp_time_low=100, xrd_enable_high=False, num_trigger_high=3, cycle_high=1, exp_time_high=1000)
|
||||
# """
|
||||
# super().__init__(
|
||||
# start=start,
|
||||
# stop=stop,
|
||||
# scan_time=scan_time,
|
||||
# scan_duration=scan_duration,
|
||||
# p_kink=p_kink,
|
||||
# e_kink=e_kink,
|
||||
# motor=motor,
|
||||
# **kwargs,
|
||||
# )
|
||||
# self.p_kink = p_kink
|
||||
# self.e_kink = e_kink
|
||||
# self.xrd_enable_low = xrd_enable_low
|
||||
# self.num_trigger_low = num_trigger_low
|
||||
# self.exp_time_low = exp_time_low
|
||||
# self.cycle_low = cycle_low
|
||||
# self.xrd_enable_high = xrd_enable_high
|
||||
# self.num_trigger_high = num_trigger_high
|
||||
# self.exp_time_high = exp_time_high
|
||||
# self.cycle_high = cycle_high
|
||||
@@ -0,0 +1,84 @@
|
||||
"""This module contains the scan class for the nidaq of the SuperXAS beamline for use in continuous mode."""
|
||||
|
||||
import time
|
||||
from typing import Literal
|
||||
|
||||
import numpy as np
|
||||
from bec_lib.device import DeviceBase
|
||||
from bec_lib.logger import bec_logger
|
||||
from bec_server.scan_server.scans import AsyncFlyScanBase
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
|
||||
class NIDAQContinuousScan(AsyncFlyScanBase):
|
||||
"""Class for the nidaq continuous scan (without mono)"""
|
||||
|
||||
scan_name = "nidaq_continuous_scan"
|
||||
scan_type = "fly"
|
||||
scan_report_hint = "device_progress"
|
||||
required_kwargs = []
|
||||
use_scan_progress_report = False
|
||||
pre_move = False
|
||||
gui_config = {"Scan Parameters": ["scan_duration"], "Data Compression": ["compression"]}
|
||||
|
||||
def __init__(
|
||||
self, scan_duration: float, daq: DeviceBase = "nidaq", compression: bool = False, **kwargs
|
||||
):
|
||||
"""The NIDAQ continuous scan is used to measure with the NIDAQ without moving the
|
||||
monochromator or any other motor. The NIDAQ thus runs in continuous mode, with a
|
||||
set scan_duration.
|
||||
|
||||
Args:
|
||||
scan_duration (float): Duration of the scan.
|
||||
daq (DeviceBase, optional): DAQ device to be used for the scan.
|
||||
Defaults to "nidaq".
|
||||
Examples:
|
||||
>>> scans.nidaq_continuous_scan(scan_duration=10)
|
||||
"""
|
||||
super().__init__(**kwargs)
|
||||
self.scan_duration = scan_duration
|
||||
self.daq = daq
|
||||
self.start_time = 0
|
||||
self.primary_readout_cycle = 1
|
||||
self.scan_parameters["scan_duration"] = scan_duration
|
||||
self.scan_parameters["compression"] = compression
|
||||
|
||||
def update_readout_priority(self):
|
||||
"""Ensure that NIDAQ is not monitored for any quick EXAFS."""
|
||||
super().update_readout_priority()
|
||||
self.readout_priority["async"].append("nidaq")
|
||||
|
||||
def prepare_positions(self):
|
||||
"""Prepare the positions for the scan."""
|
||||
yield None
|
||||
|
||||
def pre_scan(self):
|
||||
"""Pre Scan action."""
|
||||
|
||||
self.start_time = time.time()
|
||||
# Ensure parent class pre_scan actions to be called.
|
||||
yield from super().pre_scan()
|
||||
|
||||
def scan_report_instructions(self):
|
||||
"""
|
||||
Return the instructions for the scan report.
|
||||
"""
|
||||
yield from self.stubs.scan_report_instruction({"device_progress": [self.daq]})
|
||||
|
||||
def scan_core(self):
|
||||
"""Run the scan core.
|
||||
Kickoff the acquisition of the NIDAQ wait for the completion of the scan.
|
||||
"""
|
||||
kickoff_status = yield from self.stubs.kickoff(device=self.daq)
|
||||
kickoff_status.wait(timeout=5) # wait for proper kickoff of device
|
||||
|
||||
complete_status = yield from self.stubs.complete(device=self.daq, wait=False)
|
||||
|
||||
while not complete_status.done:
|
||||
# Readout monitored devices
|
||||
yield from self.stubs.read(group="monitored", point_id=self.point_id)
|
||||
time.sleep(self.primary_readout_cycle)
|
||||
self.point_id += 1
|
||||
|
||||
self.num_pos = self.point_id
|
||||
@@ -0,0 +1,216 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import queue
|
||||
import threading
|
||||
import time
|
||||
import traceback
|
||||
|
||||
import h5py
|
||||
import numpy as np
|
||||
from bec_lib import bec_logger, messages
|
||||
from bec_lib.bec_service import BECService
|
||||
from bec_lib.endpoints import MessageEndpoints
|
||||
from bec_lib.file_utils import FileWriter
|
||||
from bec_lib.redis_connector import MessageObject, RedisConnector
|
||||
from bec_lib.service_config import ServiceConfig
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
|
||||
class NIDAQWriterService(BECService):
|
||||
"""
|
||||
A service that receives data from the NIDAQ through Redis and writes it continuously to a file.
|
||||
"""
|
||||
|
||||
reshape_dataset = True
|
||||
use_redis_stream = True
|
||||
|
||||
def __init__(self, config: ServiceConfig, connector_cls: RedisConnector) -> None:
|
||||
super().__init__(config=config, connector_cls=connector_cls, unique_service=True)
|
||||
self.queue = queue.Queue()
|
||||
config = self._service_config.config.get("file_writer")
|
||||
self.writer_mixin = FileWriter(service_config=config)
|
||||
self._scan_status_consumer = None
|
||||
self._ni_data_consumer = None
|
||||
self._ni_data_event = None
|
||||
self._ni_writer = None
|
||||
self._ni_writer_event = None
|
||||
|
||||
self.scan_number = None
|
||||
self.scan_is_running = False
|
||||
self.filename = ""
|
||||
|
||||
self.elapsed_time = 0
|
||||
self.start_time = 0
|
||||
self._start_scan_status_consumer()
|
||||
self._start_ni_data_consumer()
|
||||
self._start_ni_writer()
|
||||
|
||||
def _start_scan_status_consumer(self) -> None:
|
||||
"""
|
||||
Start the scan consumer.
|
||||
"""
|
||||
self._scan_status_consumer = self.connector.consumer(
|
||||
MessageEndpoints.scan_status(), cb=self._scan_status_callback, parent=self
|
||||
)
|
||||
self._scan_status_consumer.start()
|
||||
|
||||
@staticmethod
|
||||
def _scan_status_callback(message: MessageObject, parent: NIDAQWriterService) -> None:
|
||||
"""
|
||||
Callback for scan status messages.
|
||||
"""
|
||||
msg = message.value
|
||||
if not msg:
|
||||
return
|
||||
parent.handle_scan_status(msg)
|
||||
|
||||
def _start_ni_data_consumer(self) -> None:
|
||||
"""
|
||||
Start the NI data consumer.
|
||||
"""
|
||||
|
||||
self._ni_data_event = threading.Event()
|
||||
self._ni_data_consumer = threading.Thread(target=self._run_read_loop, daemon=True)
|
||||
self._ni_data_consumer.start()
|
||||
|
||||
def _start_ni_writer(self) -> None:
|
||||
"""
|
||||
Start the NI data writer.
|
||||
"""
|
||||
self._ni_writer_event = threading.Event()
|
||||
self._ni_writer = threading.Thread(target=self._write_data, daemon=True)
|
||||
self._ni_writer.start()
|
||||
|
||||
def _run_read_loop(self) -> None:
|
||||
"""
|
||||
Run the read loop.
|
||||
"""
|
||||
while not self._ni_data_event.is_set():
|
||||
self._read_data()
|
||||
|
||||
def _read_data(self):
|
||||
"""
|
||||
Read data from Redis.
|
||||
"""
|
||||
if not self.scan_is_running:
|
||||
time.sleep(0.01)
|
||||
return
|
||||
|
||||
self.filename = self.writer_mixin.compile_full_filename(self.scan_number, "ni.h5")
|
||||
|
||||
start_time = time.time()
|
||||
if self.use_redis_stream:
|
||||
msg = self.connector.xread("ni_data")
|
||||
|
||||
if msg:
|
||||
try:
|
||||
num_msgs = len(msg[0][1])
|
||||
logger.debug(
|
||||
f"Received {num_msgs} messages in {time.time() - start_time} seconds"
|
||||
)
|
||||
msgs = [messages.DeviceMessage.loads(m[1][b"device_msg"]) for m in msg[0][1]]
|
||||
start_time = time.time()
|
||||
self.handle_ni_data(msgs)
|
||||
logger.debug(
|
||||
f"Handled {num_msgs} messages in {time.time() - start_time} seconds"
|
||||
)
|
||||
except Exception as exc:
|
||||
content = traceback.format_exc()
|
||||
logger.error(f"Failed to parse message: {content}")
|
||||
time.sleep(0.01)
|
||||
else:
|
||||
msgs = self.connector._redis_conn.lpop("ni_data:val", 20)
|
||||
time.sleep(0.001)
|
||||
if msgs:
|
||||
try:
|
||||
msgs = [messages.DeviceMessage.loads(msg) for msg in msgs]
|
||||
print(f"Received {len(msgs)} messages in {time.time() - start_time} seconds")
|
||||
start_time = time.time()
|
||||
self.handle_ni_data(msgs)
|
||||
print(f"Handled {len(msgs)} messages in {time.time() - start_time} seconds")
|
||||
except Exception as exc:
|
||||
content = traceback.format_exc()
|
||||
logger.error(f"Failed to parse message: {content}")
|
||||
|
||||
def handle_scan_status(self, msg: messages.ScanStatusMessage) -> None:
|
||||
"""
|
||||
Handle scan status messages.
|
||||
|
||||
Args:
|
||||
msg: The scan status message.
|
||||
"""
|
||||
status = msg.content["status"]
|
||||
if status == "open":
|
||||
self.scan_number = msg.content["info"].get("scan_number")
|
||||
if self.scan_number is not None:
|
||||
self.scan_is_running = True
|
||||
else:
|
||||
self.scan_is_running = False
|
||||
|
||||
def handle_ni_data(self, msgs: list[messages.DeviceMessage]) -> None:
|
||||
"""
|
||||
Receive NI data messages and write them to the writer queue.
|
||||
|
||||
Args:
|
||||
msgs: The NI data messages.
|
||||
"""
|
||||
logger.info(f"Received {len(msgs)} NI data messages")
|
||||
|
||||
# concatenate all messages
|
||||
signals = {}
|
||||
|
||||
for key in msgs[0].content["signals"]:
|
||||
signals[key] = np.concatenate([msg.content["signals"][key]["value"] for msg in msgs])
|
||||
|
||||
# write data to queue
|
||||
self.queue.put(signals)
|
||||
|
||||
def _write_data(self) -> None:
|
||||
"""
|
||||
Get data from the writer queue and write it to disk.
|
||||
"""
|
||||
while not self._ni_writer_event.is_set():
|
||||
signals = self.queue.get()
|
||||
logger.info(f"Remaining tasks: {self.queue.qsize()}")
|
||||
self.write_data(signals)
|
||||
self.queue.task_done()
|
||||
|
||||
def write_data(self, signals: dict) -> None:
|
||||
"""
|
||||
Write data to disk.
|
||||
|
||||
Args:
|
||||
signals: The signals to write to disk.
|
||||
"""
|
||||
# create a new file if it doesn't exist, otherwise append to it
|
||||
logger.info("Writing NI data to HDF5 file")
|
||||
start_time = time.time()
|
||||
if not self.filename:
|
||||
return
|
||||
with h5py.File(self.filename, "a") as file:
|
||||
if self.reshape_dataset:
|
||||
for key in signals:
|
||||
# if the dataset already exists, append to it
|
||||
if key in file:
|
||||
dataset = file[key]
|
||||
dataset.resize(dataset.shape[0] + len(signals[key]), axis=0)
|
||||
dataset[-len(signals[key]) :] = signals[key]
|
||||
# otherwise create a new dataset
|
||||
else:
|
||||
file.create_dataset(key, data=signals[key], chunks=True, maxshape=(None,))
|
||||
else:
|
||||
# get all group names
|
||||
group_names = list(file.keys())
|
||||
|
||||
# get max dataset number
|
||||
dataset_num = [int(name.split("_")[1]) for name in group_names if "dataset" in name]
|
||||
|
||||
if dataset_num:
|
||||
dataset_num = max(dataset_num) + 1
|
||||
else:
|
||||
dataset_num = 0
|
||||
group = file.create_group(f"dataset_{dataset_num}")
|
||||
for key in signals:
|
||||
group.create_dataset(key, data=signals[key], chunks=True, maxshape=(None,))
|
||||
logger.info(f"Finished writing NI data in {time.time() - start_time} seconds")
|
||||
@@ -0,0 +1 @@
|
||||
from .NIDAQ_writer import NIDAQWriterService
|
||||
@@ -0,0 +1,34 @@
|
||||
from bec_lib import messages
|
||||
from bec_lib.endpoints import MessageEndpoints
|
||||
from bec_lib.redis_connector import RedisConnector
|
||||
|
||||
|
||||
def send_scan_status(scan_number, status):
|
||||
if status == "start":
|
||||
scan_status_msg = messages.ScanStatusMessage(
|
||||
scanID="test", status="open", info={"scan_number": scan_number}
|
||||
)
|
||||
elif status == "stop":
|
||||
scan_status_msg = messages.ScanStatusMessage(
|
||||
scanID="test", status="closed", info={"scan_number": scan_number}
|
||||
)
|
||||
else:
|
||||
raise ValueError("Unknown status")
|
||||
|
||||
producer = RedisConnector(["localhost:6379"]).producer()
|
||||
producer.send(MessageEndpoints.scan_status(), scan_status_msg.dumps())
|
||||
print(f"Sent scan status message {scan_status_msg}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import argparse
|
||||
|
||||
parser = argparse.ArgumentParser(description="Scan status helper")
|
||||
command = parser.add_subparsers(dest="command")
|
||||
start = command.add_parser("start", help="Start a new scan")
|
||||
start.add_argument("--scan_number", type=int, required=True, help="Scan number")
|
||||
stop = command.add_parser("stop", help="Stop the scan")
|
||||
stop.add_argument("--scan_number", type=int, required=True, help="Scan number")
|
||||
|
||||
args = parser.parse_args()
|
||||
send_scan_status(args.scan_number, args.command)
|
||||
@@ -0,0 +1,44 @@
|
||||
import threading
|
||||
import time
|
||||
|
||||
import numpy as np
|
||||
from bec_lib import messages
|
||||
from bec_lib.redis_connector import RedisConnector
|
||||
|
||||
|
||||
class NIDAQSim(threading.Thread):
|
||||
use_redis_stream = True
|
||||
|
||||
def run(self):
|
||||
print("NIDAQSim running")
|
||||
index = 0
|
||||
producer = RedisConnector(["localhost:6379"]).producer()
|
||||
signal = np.asarray(range(index, index + 600000))
|
||||
signals = {"signal1": signal, "signal2": signal}
|
||||
|
||||
msg = messages.DeviceMessage(signals=signals)
|
||||
msg = msg.dumps()
|
||||
messages.DeviceMessage.loads(msg)
|
||||
total_time = time.time()
|
||||
while True:
|
||||
# if index > 1000:
|
||||
# break
|
||||
start = time.time()
|
||||
# signals = {
|
||||
# "signal1": np.asarray(range(index, index + 300000)),
|
||||
# "signal2": np.asarray(range(index, index + 300000)),
|
||||
# }
|
||||
|
||||
index += 1
|
||||
|
||||
if self.use_redis_stream:
|
||||
producer.xadd("ni_data", {"device_msg": msg}, max_size=100)
|
||||
else:
|
||||
producer.lpush("ni_data", msg, max_size=10)
|
||||
|
||||
time.sleep(0.5)
|
||||
print(f"Elapsed time: {time.time() - start}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
NIDAQSim().start()
|
||||
@@ -0,0 +1,29 @@
|
||||
import argparse
|
||||
import threading
|
||||
|
||||
from bec_lib import bec_logger
|
||||
from bec_lib.redis_connector import RedisConnector
|
||||
from bec_lib.service_config import ServiceConfig
|
||||
|
||||
from superxas_bec.services.NIDAQ_writer import NIDAQWriterService
|
||||
|
||||
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
|
||||
parser.add_argument("--config", default="", help="path to the config file")
|
||||
clargs = parser.parse_args()
|
||||
config_path = clargs.config
|
||||
|
||||
config = ServiceConfig(config_path)
|
||||
bec_logger.level = bec_logger.LOGLEVEL.INFO
|
||||
logger = bec_logger.logger
|
||||
|
||||
bec_server = NIDAQWriterService(config=config, connector_cls=RedisConnector)
|
||||
try:
|
||||
event = threading.Event()
|
||||
# pylint: disable=E1102
|
||||
logger.success("Started NIDAQ writer service")
|
||||
event.wait()
|
||||
except KeyboardInterrupt as e:
|
||||
# bec_server.connector.raise_error("KeyboardInterrupt")
|
||||
bec_server.shutdown()
|
||||
event.set()
|
||||
raise e
|
||||
@@ -0,0 +1,355 @@
|
||||
"""Unit tests for the Timepix device with a mocked backend."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import threading
|
||||
from types import SimpleNamespace
|
||||
from unittest import mock
|
||||
|
||||
import numpy as np
|
||||
import ophyd
|
||||
import pytest
|
||||
from bec_server.device_server.tests.utils import DMMock
|
||||
from ophyd import DeviceStatus, StatusBase
|
||||
from ophyd_devices.tests.utils import MockPV, patch_dual_pvs
|
||||
|
||||
from superxas_bec.devices.timepix.timepix import (
|
||||
ACQUIRESTATUS,
|
||||
DATASOURCE,
|
||||
EXPOSUREMODE,
|
||||
FILEWRITEMODE,
|
||||
TDCEdge,
|
||||
TDCOuput,
|
||||
TRIGGERMODE,
|
||||
TRIGGERSOURCE,
|
||||
Timepix,
|
||||
)
|
||||
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_client import TimePixFlyStatus
|
||||
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_interface import (
|
||||
NetAddresses,
|
||||
PixelMap,
|
||||
)
|
||||
|
||||
# pylint: disable=protected-access
|
||||
# pylint: disable=redefined-outer-name
|
||||
|
||||
|
||||
class FakeBackendClient:
|
||||
"""Backend client double that can finish status callbacks on demand."""
|
||||
|
||||
def __init__(self):
|
||||
self.status = TimePixFlyStatus.CONFIG
|
||||
self._status_callbacks = {}
|
||||
|
||||
def add_status_callback(self, status, success, error, run=True):
|
||||
"""Register status callback with optional immediate completion."""
|
||||
if run and self.status in success:
|
||||
status.set_finished()
|
||||
return
|
||||
self._status_callbacks[id(status)] = (status, success, error)
|
||||
|
||||
def emit_status(self, status_value: TimePixFlyStatus):
|
||||
"""Resolve tracked status objects with a simulated backend state."""
|
||||
self.status = status_value
|
||||
for cb_id, (status, success, error) in list(self._status_callbacks.items()):
|
||||
with status._lock:
|
||||
if status.done:
|
||||
self._status_callbacks.pop(cb_id, None)
|
||||
continue
|
||||
if status_value in success:
|
||||
status.set_finished()
|
||||
self._status_callbacks.pop(cb_id, None)
|
||||
elif status_value in error:
|
||||
status.set_exception(RuntimeError(f"backend entered {status_value.value}"))
|
||||
self._status_callbacks.pop(cb_id, None)
|
||||
|
||||
def get_net_addresses(self):
|
||||
"""Return a deterministic backend raw-data address."""
|
||||
return NetAddresses(
|
||||
control="127.0.0.1:8452",
|
||||
address="127.0.0.1:8451",
|
||||
server="127.0.0.1:8080",
|
||||
)
|
||||
|
||||
|
||||
class FakeBackend:
|
||||
"""Minimal backend double used to isolate Timepix from backend integration."""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.hostname = kwargs.get("hostname") or "localhost"
|
||||
self.socket_port = kwargs.get("socket_port", 9876)
|
||||
self.timepix_fly_client = FakeBackendClient()
|
||||
self.on_connected = mock.Mock()
|
||||
self.on_stage = mock.Mock()
|
||||
self.on_stop = mock.Mock()
|
||||
self.on_destroy = mock.Mock()
|
||||
self.add_callback = mock.Mock()
|
||||
self._trigger_status = StatusBase()
|
||||
self._trigger_status.set_finished()
|
||||
self._complete_status = StatusBase()
|
||||
self._complete_status.set_finished()
|
||||
|
||||
def on_trigger(self):
|
||||
"""Return a backend-prepared trigger status."""
|
||||
return self._trigger_status
|
||||
|
||||
def on_trigger_finished(self):
|
||||
"""Return the status that resolves when acquisition is complete."""
|
||||
return self._complete_status
|
||||
|
||||
def on_complete(self, status=None):
|
||||
"""Return a backend completion status."""
|
||||
if status is None:
|
||||
return self._complete_status
|
||||
status.set_finished()
|
||||
return status
|
||||
|
||||
|
||||
def _finished_status(device=None):
|
||||
"""Create a finished status for mocked signal set operations."""
|
||||
status = DeviceStatus(device=device)
|
||||
status.set_finished()
|
||||
return status
|
||||
|
||||
|
||||
def _force_signal_value(signal, value):
|
||||
"""Set a mocked PV-backed signal value, including read-only EPICS signals."""
|
||||
if hasattr(signal, "_read_pv"):
|
||||
signal._read_pv.mock_data = value
|
||||
return
|
||||
signal.put(value)
|
||||
|
||||
|
||||
def _message_value(signal):
|
||||
"""Extract the signal payload from a BEC message signal."""
|
||||
msg = signal.get()
|
||||
return msg.signals[signal.name]["value"]
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def timepix():
|
||||
"""Timepix device with mocked EPICS signals and a fully mocked backend."""
|
||||
backend = FakeBackend()
|
||||
scan_info = SimpleNamespace(
|
||||
msg=SimpleNamespace(
|
||||
scan_name="step_scan",
|
||||
scan_parameters={"exp_time": 0.1, "frames_per_trigger": 2},
|
||||
num_points=3,
|
||||
)
|
||||
)
|
||||
with (
|
||||
mock.patch.object(ophyd, "cl") as mock_cl,
|
||||
mock.patch(
|
||||
"superxas_bec.devices.timepix.timepix.TimepixFlyBackend", return_value=backend
|
||||
),
|
||||
):
|
||||
mock_cl.get_pv = MockPV
|
||||
mock_cl.thread_class = threading.Thread
|
||||
dev = Timepix(
|
||||
name="timepix",
|
||||
prefix="X10DA-ES-TPX1:",
|
||||
backend_rest_url="localhost:8452",
|
||||
hostname="localhost",
|
||||
socket_port=9876,
|
||||
scan_info=scan_info,
|
||||
device_manager=DMMock(),
|
||||
)
|
||||
patch_dual_pvs(dev)
|
||||
dev.backend = backend
|
||||
dev._poll_thread = mock.Mock()
|
||||
for walk in dev.walk_signals():
|
||||
signal = walk.item
|
||||
if hasattr(signal, "set") and hasattr(signal, "put"):
|
||||
signal.set = mock.Mock(side_effect=lambda value, _sig=signal, **_kw: (_force_signal_value(_sig, value), _finished_status(_sig))[1]) # type: ignore[method-assign]
|
||||
yield dev
|
||||
dev._poll_thread_kill_event.set()
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def pixel_map():
|
||||
"""Small valid pixel map used for stage tests."""
|
||||
return PixelMap(chips=[[{"i": 0, "p": 0, "f": [1.0]}]])
|
||||
|
||||
|
||||
def test_timepix_on_connected_configures_signals_and_registers_callback(timepix):
|
||||
"""Connected hook should configure camera, file writer, and backend callback."""
|
||||
timepix.on_connected()
|
||||
|
||||
assert timepix.cam.tdc1_enable.get() == 1
|
||||
assert timepix.cam.tdc2_enable.get() == 1
|
||||
assert timepix.cam.raw_enable.get() == 1
|
||||
assert timepix.cam.tdc1_edge.get() == TDCEdge.RISING
|
||||
assert timepix.cam.tdc1_output.get() == TDCOuput.ALL_CHANNELS
|
||||
assert timepix.cam.trigger_mode.get() == TRIGGERMODE.INTERNAL
|
||||
assert timepix.cam.trigger_source.get() == TRIGGERSOURCE.HDMI1_1
|
||||
assert timepix.cam.exposure_mode.get() == EXPOSUREMODE.TIMED
|
||||
assert timepix.cam.array_counter.get() == 0
|
||||
assert timepix.hdf.enable.get() == "1"
|
||||
assert timepix.hdf.file_write_mode.get() == FILEWRITEMODE.STREAM.value
|
||||
assert timepix.hdf.auto_save.get() == 1
|
||||
assert timepix.cam.array_callbacks.get() == 1
|
||||
timepix.backend.add_callback.assert_called_once_with(timepix.msg_buffer_callback)
|
||||
timepix._poll_thread.start.assert_called_once()
|
||||
|
||||
|
||||
def test_timepix_on_stage_configures_camera_writer_and_mocked_backend(timepix, pixel_map):
|
||||
"""Stage should configure camera settings and forward config to the backend."""
|
||||
timepix._pixel_map = pixel_map
|
||||
|
||||
with (
|
||||
mock.patch.object(timepix.hdf.enable, "get", return_value=1),
|
||||
mock.patch(
|
||||
"superxas_bec.devices.timepix.timepix.get_full_path",
|
||||
return_value="/tmp/timepix/test_scan.h5",
|
||||
),
|
||||
):
|
||||
timepix.on_stage()
|
||||
|
||||
assert timepix.cam.acquire_time.get() == pytest.approx(0.1 - timepix._readout_time)
|
||||
assert timepix.cam.acquire_period.get() == pytest.approx(0.1)
|
||||
assert timepix.cam.num_images.get() == 2
|
||||
assert timepix.cam.data_source.get() == DATASOURCE.IMAGE
|
||||
assert timepix.hdf.file_path.get() == "/tmp/timepix"
|
||||
assert timepix.hdf.file_name.get() == "test_scan.h5"
|
||||
assert timepix.hdf.num_capture.get() == 6
|
||||
assert timepix.hdf.capture.get() == 1
|
||||
assert timepix.cam.raw_file_template.get() == ""
|
||||
assert timepix.cam.raw_file_path.get() == "tcp://connect@127.0.0.1:8451"
|
||||
assert timepix.backend.on_stage.call_count == 1
|
||||
other_config = timepix.backend.on_stage.call_args.kwargs["other_config"]
|
||||
assert other_config.output_uri == "tcp:localhost:9876"
|
||||
assert other_config.TRoiStep == timepix.troistep
|
||||
assert other_config.TRoiN == timepix.troin
|
||||
assert timepix.backend.on_stage.call_args.kwargs["pixel_map"] == pixel_map
|
||||
file_event = timepix.file_event.get()
|
||||
assert file_event.file_path == "/tmp/timepix/test_scan.h5"
|
||||
assert file_event.done is False
|
||||
assert file_event.successful is False
|
||||
|
||||
|
||||
def test_timepix_on_stage_without_xes_skips_backend_configuration(timepix):
|
||||
"""When XES is disabled, the backend-specific stage call should be skipped."""
|
||||
timepix.enable_xes = False
|
||||
|
||||
with mock.patch(
|
||||
"superxas_bec.devices.timepix.timepix.get_full_path",
|
||||
return_value="/tmp/timepix/no_xes_scan.h5",
|
||||
):
|
||||
timepix.on_stage()
|
||||
|
||||
timepix.backend.on_stage.assert_not_called()
|
||||
|
||||
|
||||
def test_timepix_on_trigger_combines_camera_and_backend_status(timepix):
|
||||
"""Trigger should arm the backend first and then drive the camera."""
|
||||
backend_done = StatusBase()
|
||||
backend_done.set_finished()
|
||||
backend_finished = StatusBase()
|
||||
timepix.backend._trigger_status = backend_done
|
||||
timepix.backend._complete_status = backend_finished
|
||||
|
||||
status = timepix.on_trigger()
|
||||
|
||||
assert isinstance(status, StatusBase)
|
||||
assert timepix.cam.acquire.get() == 1
|
||||
_force_signal_value(timepix.cam.acquire_busy, ACQUIRESTATUS.ACQUIRING)
|
||||
_force_signal_value(timepix.cam.acquire_busy, ACQUIRESTATUS.DONE)
|
||||
backend_finished.set_finished()
|
||||
status.wait(timeout=0.1)
|
||||
assert status.done is True
|
||||
assert status.success is True
|
||||
|
||||
|
||||
def test_timepix_on_complete_marks_file_event_success(timepix):
|
||||
"""Complete should wait for writer/backend completion and emit a success file event."""
|
||||
timepix._full_path = "/tmp/timepix/final_scan.h5"
|
||||
timepix._n_images = 3
|
||||
_force_signal_value(timepix.cam.acquire_busy, ACQUIRESTATUS.ACQUIRING)
|
||||
timepix.hdf.capture.put(ACQUIRESTATUS.ACQUIRING)
|
||||
timepix.hdf.write_file.put(ACQUIRESTATUS.ACQUIRING)
|
||||
|
||||
with mock.patch.object(timepix.hdf.enable, "get", return_value=1):
|
||||
status = timepix.on_complete()
|
||||
|
||||
_force_signal_value(timepix.cam.acquire_busy, ACQUIRESTATUS.DONE)
|
||||
timepix.hdf.capture.put(ACQUIRESTATUS.DONE)
|
||||
timepix.hdf.write_file.put(ACQUIRESTATUS.DONE)
|
||||
timepix.hdf.write_status.put(0)
|
||||
_force_signal_value(timepix.hdf.num_captured, 3)
|
||||
status.wait(timeout=0.1)
|
||||
|
||||
file_event = timepix.file_event.get()
|
||||
assert file_event.file_path == "/tmp/timepix/final_scan.h5"
|
||||
assert file_event.done is True
|
||||
assert file_event.successful is True
|
||||
|
||||
|
||||
def test_timepix_msg_buffer_callback_updates_xes_signals(timepix):
|
||||
"""The backend message callback should populate all exported XES data signals."""
|
||||
start_frame = {
|
||||
"type": "StartFrame",
|
||||
"Mode": "TOA",
|
||||
"TRoiStart": 0,
|
||||
"TRoiStep": 1,
|
||||
"TRoiN": 2,
|
||||
"NumEnergyPoints": 8,
|
||||
"save_interval": 1,
|
||||
}
|
||||
data_frames = [
|
||||
{
|
||||
"type": "XesData",
|
||||
"period": 131000,
|
||||
"totalEvents": 36,
|
||||
"TDSpectra": list(range(16)),
|
||||
"beforeROI": 0,
|
||||
"afterROI": 0,
|
||||
}
|
||||
]
|
||||
end_frame = {"type": "EndFrame", "error": "", "periods": 4}
|
||||
timepix.troin = 2
|
||||
|
||||
timepix.msg_buffer_callback(start_frame, data_frames, end_frame)
|
||||
|
||||
expected_xes = np.array(
|
||||
[[0, 1, 2, 3, 4, 5, 6, 7], [8, 9, 10, 11, 12, 13, 14, 15]], dtype=np.float32
|
||||
)
|
||||
np.testing.assert_array_equal(_message_value(timepix.xes_data), expected_xes)
|
||||
np.testing.assert_array_equal(
|
||||
_message_value(timepix.xes_spectra), np.array([28, 92], dtype=np.float32)
|
||||
)
|
||||
np.testing.assert_array_equal(
|
||||
_message_value(timepix.xes_energy_1), np.array([6, 38], dtype=np.float32)
|
||||
)
|
||||
np.testing.assert_array_equal(
|
||||
_message_value(timepix.xes_energy_2), np.array([22, 54], dtype=np.float32)
|
||||
)
|
||||
np.testing.assert_array_equal(
|
||||
_message_value(timepix.xes_data_accumulated_1), np.array([6, 38], dtype=np.float32)
|
||||
)
|
||||
np.testing.assert_array_equal(
|
||||
_message_value(timepix.xes_data_accumulated_2), np.array([22, 54], dtype=np.float32)
|
||||
)
|
||||
assert _message_value(timepix.total_periods) == pytest.approx(4 / 131000)
|
||||
np.testing.assert_array_equal(_message_value(timepix.tds_period), np.array([1.0]))
|
||||
assert _message_value(timepix.total_events) == 36
|
||||
|
||||
|
||||
def test_timepix_on_stop_stops_camera_writer_and_backend(timepix):
|
||||
"""Stop should stop camera acquisition and delegate backend stop."""
|
||||
timepix.cam.acquire.put(1)
|
||||
timepix.hdf.capture.put(1)
|
||||
|
||||
timepix.on_stop()
|
||||
|
||||
assert timepix.cam.acquire.get() == 0
|
||||
assert timepix.hdf.capture.get() == 0
|
||||
timepix.backend.on_stop.assert_called_once()
|
||||
|
||||
|
||||
def test_timepix_on_destroy_cleans_up_backend(timepix):
|
||||
"""Destroy should stop polling and forward cleanup to the backend."""
|
||||
timepix.on_destroy()
|
||||
|
||||
assert timepix._poll_thread_kill_event.is_set() is True
|
||||
timepix.backend.on_stop.assert_called_once()
|
||||
timepix.backend.on_destroy.assert_called_once()
|
||||
@@ -1,15 +1,199 @@
|
||||
"""This module tests the Timepix Fly backend functionality."""
|
||||
"""Unit tests for the Timepix Fly backend."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
from types import SimpleNamespace
|
||||
from unittest import mock
|
||||
|
||||
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_backend import TimepixFlyBackend
|
||||
import pytest
|
||||
from ophyd import StatusBase
|
||||
|
||||
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_backend import (
|
||||
TimepixFlyBackend,
|
||||
)
|
||||
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_client import (
|
||||
TimePixFlyStatus,
|
||||
TimePixStatusError,
|
||||
)
|
||||
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_interface import (
|
||||
NetAddresses,
|
||||
OtherConfigModel,
|
||||
PixelMap,
|
||||
)
|
||||
|
||||
|
||||
class FakeTimepixFlyClient:
|
||||
"""Minimal client double that can drive backend status callbacks."""
|
||||
|
||||
def __init__(self, rest_url: str, ws_url: str):
|
||||
self.rest_url = rest_url
|
||||
self.ws_url = ws_url
|
||||
self.status = TimePixFlyStatus.CONFIG
|
||||
self._status_callbacks = {}
|
||||
self.error_message = "boom"
|
||||
self.on_connected = mock.Mock()
|
||||
self.shutdown = mock.Mock()
|
||||
self.start = mock.Mock()
|
||||
self.stop_running_collection = mock.Mock()
|
||||
self.set_other_config = mock.Mock()
|
||||
self.set_pixel_map = mock.Mock()
|
||||
self.get_net_addresses = mock.Mock(
|
||||
return_value=NetAddresses(
|
||||
control="127.0.0.1:8452",
|
||||
address="127.0.0.1:8451",
|
||||
server="127.0.0.1:8080",
|
||||
)
|
||||
)
|
||||
|
||||
def add_status_callback(self, status, success, error, run=True):
|
||||
"""Store callbacks and optionally resolve them immediately."""
|
||||
if run:
|
||||
if self.status in success:
|
||||
status.set_finished()
|
||||
return
|
||||
if self.status in error:
|
||||
status.set_exception(
|
||||
TimePixStatusError(f"TimePixFly state '{self.status.value}': {self.error_message}")
|
||||
)
|
||||
return
|
||||
self._status_callbacks[id(status)] = (status, success, error)
|
||||
|
||||
def last_error(self):
|
||||
"""Return a lightweight error object."""
|
||||
return SimpleNamespace(message=self.error_message)
|
||||
|
||||
def emit_status(self, status_value: TimePixFlyStatus):
|
||||
"""Resolve stored status callbacks as if a websocket status update arrived."""
|
||||
self.status = status_value
|
||||
for cb_id, (status, success, error) in list(self._status_callbacks.items()):
|
||||
with status._lock:
|
||||
if status.done:
|
||||
self._status_callbacks.pop(cb_id, None)
|
||||
continue
|
||||
if status_value in success:
|
||||
status.set_finished()
|
||||
self._status_callbacks.pop(cb_id, None)
|
||||
elif status_value in error:
|
||||
status.set_exception(
|
||||
TimePixStatusError(
|
||||
f"TimePixFly state '{status_value.value}': {self.error_message}"
|
||||
)
|
||||
)
|
||||
self._status_callbacks.pop(cb_id, None)
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def timepix_fly_backend():
|
||||
"""Fixture for creating a Timepix Fly backend instance."""
|
||||
backend = TimepixFlyBackend(backend_rest_url="http://localhost:8000")
|
||||
yield backend
|
||||
def backend_with_states():
|
||||
"""Return a backend together with a helper that emits backend states."""
|
||||
with mock.patch(
|
||||
"superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_backend.TimepixFlyClient",
|
||||
FakeTimepixFlyClient,
|
||||
):
|
||||
backend = TimepixFlyBackend(backend_rest_url="localhost:8452", hostname="localhost")
|
||||
yield backend, backend.timepix_fly_client
|
||||
backend.on_destroy()
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def pixel_map():
|
||||
"""Small valid pixel map for backend unit tests."""
|
||||
return PixelMap(chips=[[{"i": 0, "p": 0, "f": [1.0]}]])
|
||||
|
||||
|
||||
def test_timepix_fly_backend_stage_pushes_configuration(backend_with_states, pixel_map):
|
||||
"""Stage should push both config objects to the client."""
|
||||
backend, client = backend_with_states
|
||||
other_config = OtherConfigModel(output_uri="tcp:localhost:9000", TRoiStep=2, TRoiN=16)
|
||||
|
||||
backend.on_stage(other_config=other_config, pixel_map=pixel_map)
|
||||
|
||||
client.set_other_config.assert_called_once_with(other_config)
|
||||
client.set_pixel_map.assert_called_once_with(pixel_map)
|
||||
|
||||
|
||||
def test_timepix_fly_backend_trigger_callback_success(backend_with_states):
|
||||
"""Trigger status should resolve once the backend reports await_connection."""
|
||||
backend, state_driver = backend_with_states
|
||||
|
||||
status = backend.on_trigger()
|
||||
assert status.done is False
|
||||
|
||||
state_driver.emit_status(TimePixFlyStatus.AWAIT_CONNECTION)
|
||||
|
||||
status.wait(timeout=0.1)
|
||||
assert status.done is True
|
||||
assert status.success is True
|
||||
|
||||
|
||||
def test_timepix_fly_backend_trigger_callback_error(backend_with_states):
|
||||
"""Trigger status should fail when the backend reports an exception state."""
|
||||
backend, state_driver = backend_with_states
|
||||
|
||||
status = backend.on_trigger()
|
||||
state_driver.error_message = "failed to configure"
|
||||
state_driver.emit_status(TimePixFlyStatus.EXCEPT)
|
||||
|
||||
with pytest.raises(TimePixStatusError, match="failed to configure"):
|
||||
status.wait(timeout=0.1)
|
||||
|
||||
|
||||
def test_timepix_fly_backend_complete_callback_success(backend_with_states):
|
||||
"""Complete status should resolve when the backend goes back to config."""
|
||||
backend, state_driver = backend_with_states
|
||||
|
||||
status = backend.on_complete()
|
||||
assert status.done is True
|
||||
assert status.success is True
|
||||
state_driver.emit_status(TimePixFlyStatus.CONFIG)
|
||||
|
||||
|
||||
def test_timepix_fly_backend_stop_cancels_tracked_statuses(backend_with_states):
|
||||
"""Stopping the backend should fail all tracked statuses and stop collection."""
|
||||
backend, client = backend_with_states
|
||||
status = StatusBase()
|
||||
backend.cancel_on_stop(status)
|
||||
|
||||
backend.on_stop()
|
||||
|
||||
client.stop_running_collection.assert_called_once()
|
||||
with pytest.raises(RuntimeError, match="Stop called on device"):
|
||||
status.wait(timeout=0.1)
|
||||
|
||||
|
||||
def test_timepix_fly_backend_add_and_remove_callback(backend_with_states):
|
||||
"""Callbacks can be registered and removed by id."""
|
||||
backend, _ = backend_with_states
|
||||
|
||||
cb_id = backend.add_callback(lambda *_args, **_kwargs: None, kwd={"scan_id": 5})
|
||||
|
||||
stored_cb_id = next(iter(backend.callbacks))
|
||||
assert str(stored_cb_id) == cb_id
|
||||
backend.remove_callback(stored_cb_id)
|
||||
assert stored_cb_id not in backend.callbacks
|
||||
|
||||
|
||||
def test_timepix_fly_backend_decode_end_frame_runs_callbacks(backend_with_states):
|
||||
"""The buffered frame callback should be invoked only once EndFrame arrives."""
|
||||
backend, _ = backend_with_states
|
||||
received = {}
|
||||
|
||||
def callback(start_frame, data_frames, end_frame, scan_id):
|
||||
received["start_frame"] = start_frame
|
||||
received["data_frames"] = data_frames
|
||||
received["end_frame"] = end_frame
|
||||
received["scan_id"] = scan_id
|
||||
|
||||
backend.add_callback(callback, kwd={"scan_id": 7})
|
||||
backend._decode_received_data(
|
||||
'{"type":"StartFrame","Mode":"TOA","TRoiStart":0,"TRoiStep":1,"TRoiN":2,"NumEnergyPoints":2,"save_interval":10}'
|
||||
)
|
||||
backend._decode_received_data(
|
||||
'{"type":"XesData","period":1,"TDSpectra":[1,2,3,4],"totalEvents":4,"beforeROI":0,"afterROI":0}'
|
||||
)
|
||||
backend._decode_received_data('{"type":"EndFrame","error":"","periods":5}')
|
||||
|
||||
assert received["start_frame"]["type"] == "StartFrame"
|
||||
assert received["data_frames"][0]["type"] == "XesData"
|
||||
assert received["end_frame"]["type"] == "EndFrame"
|
||||
assert received["scan_id"] == 7
|
||||
assert backend._TimepixFlyBackend__msg_buffer == []
|
||||
|
||||
Reference in New Issue
Block a user