22 Commits

Author SHA1 Message Date
x10da d17f3deefa Introduced nexus data structure (same as Debye)
CI for superxas_bec / test (pull_request) Successful in 34s
CI for superxas_bec / test (push) Successful in 34s
2026-05-07 14:10:40 +02:00
x10da f70ac8743d Added frontend absorber 2026-05-07 14:10:04 +02:00
x10da 2981c436db Updated config files 2026-05-07 14:10:04 +02:00
x10da 09a0bc6372 Adding signals of additional nidaq signals 2026-05-07 14:10:04 +02:00
x10da fff1e21481 Renaming of offset signals 2026-05-07 14:10:04 +02:00
x10da 2bfa7b6ca3 Change of order of nidaq signals 2026-05-07 14:10:04 +02:00
x10da 3f79b300ed Implemented logic to move goniometer to correct position. Disabled XAS with XRD scans 2026-05-07 14:10:04 +02:00
x10da 36cffb72a5 Disabled XAS with XRD scans 2026-05-07 14:10:04 +02:00
x10da 55bc4585e2 Added new signals 2026-05-07 14:10:04 +02:00
x10da 4f1386f5e1 Bugfix Transistionstatus in kickoff 2026-05-07 14:10:04 +02:00
x10da d687d74a74 Added mono goniometer motor 2026-05-07 14:10:04 +02:00
hitz_s aa6270fb55 introduction of mo1_bragg and nidaq 2026-05-07 14:10:04 +02:00
appel_c 8f7a445d93 fix: delete pixelmap json, make it a method
CI for superxas_bec / test (pull_request) Successful in 34s
CI for superxas_bec / test (push) Successful in 32s
2026-05-07 13:05:17 +02:00
appel_c abc0229efb fix(timepix): remove status writer from trigger call
CI for superxas_bec / test (push) Successful in 32s
CI for superxas_bec / test (pull_request) Successful in 33s
2026-05-07 12:47:20 +02:00
appel_c 2b51e67734 refactor(timepix): enable/disable timepixfly backend 2026-05-07 12:47:20 +02:00
appel_c 85c482513d feat: Add pixel map folder for default pixel_map configurations. 2026-05-07 12:47:19 +02:00
appel_c 360dcc59ca fix(timepix): split logic of backend and control, backend client works 2026-05-07 12:47:19 +02:00
appel_c 962dbf8607 refactor(timepix): add ws status updates from backend 2026-05-07 12:47:18 +02:00
appel_c bb14841a3a refactor: refactored timepix integration, timepix_fly_client moved to seperate module 2026-05-07 12:47:18 +02:00
appel_c e7b409fa51 feat(timepix): Timepix integration with TimepixFly backend 2026-05-07 09:52:31 +02:00
perl_d a0c01ad51c Update repo with template version v1.2.8
CI for superxas_bec / test (pull_request) Successful in 39s
CI for superxas_bec / test (push) Successful in 34s
2026-02-27 15:49:26 +01:00
perl_d ec33ece7c6 Update repo with template version v1.2.7
CI for superxas_bec / test (push) Failing after 0s
CI for superxas_bec / test (pull_request) Failing after 0s
2026-02-27 12:11:40 +01:00
49 changed files with 7055 additions and 24 deletions
+1 -1
View File
@@ -2,7 +2,7 @@
# It is needed to track the repo template version, and editing may break things.
# This file will be overwritten by copier on template updates.
_commit: v1.2.2
_commit: v1.2.8
_src_path: https://github.com/bec-project/plugin_copier_template.git
make_commit: false
project_name: superxas_bec
+14 -9
View File
@@ -28,7 +28,7 @@ on:
description: "Python version to use"
required: false
type: string
default: "3.11"
default: "3.12"
permissions:
pull-requests: write
@@ -44,7 +44,19 @@ jobs:
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: "${{ inputs.PYTHON_VERSION || '3.11' }}"
python-version: "${{ inputs.PYTHON_VERSION || '3.12' }}"
- name: Checkout BEC Plugin Repository
uses: actions/checkout@v4
with:
repository: bec/superxas_bec
ref: "${{ inputs.BEC_PLUGIN_REPO_BRANCH || github.head_ref || github.sha }}"
path: ./superxas_bec
- name: Lint for merge conflicts from template updates
shell: bash
# Find all Copier conflicts except this line
run: '! grep -r "<<<<<<< before updating" | grep -v "grep -r \"<<<<<<< before updating"'
- name: Checkout BEC Core
uses: actions/checkout@v4
@@ -67,13 +79,6 @@ jobs:
ref: "${{ inputs.BEC_WIDGETS_BRANCH || 'main' }}"
path: ./bec_widgets
- name: Checkout BEC Plugin Repository
uses: actions/checkout@v4
with:
repository: bec/superxas_bec
ref: "${{ inputs.BEC_PLUGIN_REPO_BRANCH || github.head_ref || github.sha }}"
path: ./superxas_bec
- name: Install dependencies
shell: bash
run: |
+62
View File
@@ -0,0 +1,62 @@
name: Create template upgrade PR for superxas_bec
on:
workflow_dispatch:
permissions:
pull-requests: write
jobs:
create_update_branch_and_pr:
runs-on: ubuntu-latest
permissions:
contents: write
pull-requests: write
steps:
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.12'
- name: Install tools
run: |
pip install copier PySide6
- name: Checkout
uses: actions/checkout@v4
- name: Perform update
run: |
git config --global user.email "bec_ci_staging@psi.ch"
git config --global user.name "BEC automated CI"
branch="chore/update-template-$(python -m uuid)"
echo "switching to branch $branch"
git checkout -b $branch
echo "Running copier update..."
output="$(copier update --trust --defaults --conflict inline 2>&1)"
echo "$output"
msg="$(printf '%s\n' "$output" | head -n 1)"
if ! grep -q "make_commit: true" .copier-answers.yml ; then
echo "Autocommit not made, committing..."
git add -A
git commit -a -m "$msg"
fi
if diff-index --quiet HEAD ; then
echo "No changes detected"
exit 0
fi
git push -u origin $branch
curl -X POST "https://gitea.psi.ch/api/v1/repos/${{ gitea.repository }}/pulls" \
-H "Authorization: token ${{ secrets.CI_REPO_WRITE }}" \
-H "Content-Type: application/json" \
-d "{
\"title\": \"Template: $(echo $msg)\",
\"body\": \"This PR was created by Gitea Actions\",
\"head\": \"$(echo $branch)\",
\"base\": \"main\"
}"
-7
View File
@@ -1,7 +0,0 @@
include:
- file: /templates/plugin-repo-template.yml
inputs:
name: superxas_bec
target: superxas_bec
branch: $CHILD_PIPELINE_BRANCH
project: bec/awi_utils
+8 -2
View File
@@ -6,13 +6,19 @@ build-backend = "hatchling.build"
name = "superxas_bec"
version = "0.0.0"
description = "A plugin repository for BEC"
requires-python = ">=3.10"
requires-python = ">=3.11"
classifiers = [
"Development Status :: 3 - Alpha",
"Programming Language :: Python :: 3",
"Topic :: Scientific/Engineering",
]
dependencies = []
dependencies = [
"numpy",
"scipy",
"xrt",
"websockets",
]
[project.optional-dependencies]
dev = [
@@ -0,0 +1,239 @@
"""
X10DA / SuperXAS Beamline Parameters.
This file describes the parameter of each component of the SuperXAS beamline
to be used for raytracing and geometrical calculations.
"""
import os
import numpy as np
from collections import namedtuple
import xrt.backends.raycing.materials as rm
# if os.environ.get("USE_XRT", "True").lower() in ("1", "true", "yes"):
# import xrt.backends.raycing.materials as rm # type: ignore
# else:
# class _DummyClass:
# def __init__(self, *args, **kwargs):
# pass
# class _DummyMaterials:
# Material = _DummyClass
# CrystalSi = _DummyClass
# rm = _DummyMaterials()
# XRT definitions
filterBeryl = rm.Material('Be', rho=1.85, kind='plate')
filterDiamond = rm.Material('C', rho=3.52, kind='plate')
filterGraphite = rm.Material('C', rho=2.266, kind='plate')
stripeSi = rm.Material('Si', rho=2.33)
stripePt = rm.Material('Pt', rho=21.45)
stripeRh = rm.Material('Rh', rho=12.41)
stripeCr = rm.Material('Cr', rho=7.14)
stripePyrex = rm.Material('Si', rho=2.20) # Use Si as bare element and the density of SiO2
si111_1 = rm.CrystalSi(hkl=(1, 1, 1), tK=77) # first xtal surface
si311_1 = rm.CrystalSi(hkl=(3, 1, 1), tK=77) # first xtal surface
si333_1 = rm.CrystalSi(hkl=(3, 3, 3), tK=77) # first xtal surface
si511_1 = rm.CrystalSi(hkl=(5, 1, 1), tK=77) # first xtal surface
si111_2 = rm.CrystalSi(hkl=(1, 1, 1), tK=77) # second xtal surface
si311_2 = rm.CrystalSi(hkl=(3, 1, 1), tK=77) # second xtal surface
si333_2 = rm.CrystalSi(hkl=(3, 3, 3), tK=77) # second xtal surface
si511_2 = rm.CrystalSi(hkl=(5, 1, 1), tK=77) # second xtal surface
filterDiamond = rm.Material('C', rho=3.52, kind='plate')
filterBe = rm.Material('Be', rho=1.85, kind='plate')
filterSi3N4 = rm.Material(['Si', 'N'], quantities=[3, 4], rho=3.44, kind='plate')
filterAl = rm.Material('Al', rho=2.69, kind='plate')
filterGraphite = rm.Material('C', rho=2.266, kind='plate')
sourceHeight = 0
#Synchrotron
synchrotron = namedtuple('synchrotron', ['eE', 'eI', 'eEspread',
'eEpsilonX', 'eEpsilonZ', 'betaX', 'betaZ'])
sls1 = synchrotron(
eE = 2.4,
eI = 0.4,
eEspread=0.878e-3,
eEpsilonX=5.63,
eEpsilonZ=0.007,
betaX=0.45,
betaZ=14.4,
)
sls2 = synchrotron(
eE=2.7,
eI=0.4,
eEspread=1.147e-3,
eEpsilonX=0.156,
eEpsilonZ=0.01,
betaX=0.18,
betaZ=4.6,
)
# Source
bendingMagnet = namedtuple('bendingMagnet', ['name', 'center', 'sync', 'B0'])
sls1_29t = bendingMagnet(
name='FE-BM-SLS1-2.9T',
center=(0, 0, 0),
sync=sls1,
B0=2.9,)
sls2_21t = bendingMagnet(
name='FE-BM-SLS2-2.1T',
center=(0, 0, 0),
sync=sls1,
B0=2.1,)
# FE slits
slits = namedtuple('slits', ['name', 'center', 'maxDivH', 'maxDivV'])
feSlits = slits(
name='FE-SLITS',
center=(0, 5290, sourceHeight),
maxDivH=1.8e-3,
maxDivV=0.8e-3,)
# Filters
filt = namedtuple('filt', ['name', 'center', 'pitch', 'limPhysX', 'limPhysY', 'surface', 'material', 'thickness'])
feWindow = filt(
name='FE-WINDOW',
center=(0., 6158, sourceHeight),
pitch=np.pi/2,
limPhysX=(-6, 6),
limPhysY=(-3., 3.),
surface='None',
material=filterDiamond,
thickness=0.1,)
feWindow = feWindow._replace(surface='CVD Diamond window {0:0.0f} $\mu$m'.format(feWindow.thickness*1e3))
feFilt = filt(
name='FE-FI',
center=(0., 6590, sourceHeight),
pitch=np.pi/2,
limPhysX=(-15, 15),
limPhysY=(-10, 10),
surface='None',
material=filterGraphite,
thickness=0.25,)
feFilt = feFilt._replace(surface='Graphite filter {0:0.0f} $\mu$m'.format(feFilt.thickness*1e3))
# Collimating mirror
collimatingMirror = namedtuple('collimatingMirror', ['name',
'center', 'surface', 'material', 'limPhysX', 'limPhysY',
'limOptX', 'limOptY', 'R', 'pitch', 'jack1', 'jack2', 'jack3',
'tx1', 'tx2'])
cm = collimatingMirror(
name='FE-CM',
center=[0, 7618, sourceHeight],
surface=('Rh','Si','Pt'),
material=(stripeRh, stripeSi, stripePt),
limPhysX=(-30, 30),
limPhysY=(-600, 600),
limOptX=((11, -2, -21), (21, 8, -5)),
limOptY=((-500, -500, -500), (500, 500, 500)),
R=[3e6, 15e6],
pitch=[1.4e-3, 4.5e-3],
jack1=[0., 7210., 0.], #Tripod X, Y, Z (global)
jack2=[-210., 8310., 0.],
jack3=[210., 8310., 0.],
tx1=[0.0, -575.5], # X-Stage 1 [x, y] (local)
tx2=[0.0, 575],) # X-Stage 2
apertures = namedtuple('apertures', ['name', 'center', 'opening'])
fePS = apertures(
name='FE-PS',
center=[0, 8760, sourceHeight],
opening=[-39/2, 39/2, -10, 29]) # left, right, bottom, top
opWbBsBlock = apertures(
name='OP-WB-BS-BLOCK',
center=[0., 13606-135, sourceHeight],
opening=[-18., 18., 42, 76]) # left, right, bottom, top
opSlits = apertures(
name='OP-SLITS',
center=[0, 14145-135, sourceHeight],
opening=[-35/2, 35/2, 47.5, 82.5])
# Monochromator
monochromator = namedtuple('monochromator', ['name', 'center',
'xtal', 'material1', 'material2', 'xtalWidth', 'xtalOffsetX',
'xtalLength1', 'xtalLength2', 'xtalGap', 'rotOffset',
'heightOffset', 'braggLim', 'jack1', 'jack2', 'jack3', 'tx'])
mo1 = monochromator(
name='OP-CCM1',
center=[0., 11670-135, sourceHeight],
xtal=('Si311','Si111'),
material1=(si311_1, si111_1),
material2=(si311_2, si111_2),
xtalWidth = (20, 20),
xtalOffsetX=(-19.2, 19.2),
xtalLength1 = (60, 60),
xtalLength2 = (60, 60),
xtalGap = (8, 8),
rotOffset = 6, # not sure what it is
heightOffset = 8.5, # not sure what it is
braggLim = [4, 35],
jack1=[0., 11350., 0.], #Tripod not available!
jack2=[-400., 12350., 0.],
jack3=[400., 12350., 0.],
tx=0.0,) # X-Stage [x]
# Focusing mirror
focusingMirror = namedtuple('focusingMirror', ['name', 'center',
'surfaceToroid', 'materialToroid',
'limPhysXToroid', 'limPhysYToroid',
'limOptXToroid', 'limOptYToroid',
'R', 'pitch', 'r', 'xToroid', 'hToroid', 'jack1', 'jack2', 'jack3',
'tx1', 'tx2'])
fm = focusingMirror(
name='OP-FM',
center=[0., 15580-135, sourceHeight],
surfaceToroid=('Rh', 'Pt'),
materialToroid=(stripeRh, stripePt),
limPhysXToroid=(-54., 54.),
limPhysYToroid=(-565., 565.),
limOptXToroid=((90.25, 41.75), (51.75, 5.75)), # With old VME axis, no absolute value!
limOptYToroid=((-500., -500.), (500., 500.)),
R=[3e6, 15e6],
pitch=[1.4e-3, 4.5e-3],
r=[30, 20],
xToroid=[24.126, -22,874], # offset in local x
hToroid=[7., 11.3], # depth of the cylinder at x = xCylinder1 and x = xCylinder2.
jack1=[0., 14980., 0.],
jack2=[-75., 16180., 0.],
jack3=[75., 16180., 0.],
tx1=[0., -575.], # X-Stage 1 [x, y]
tx2=[0., 575.],) # X-Stage 2 [x, y]
ehWindow = filt(
name='EH-WINDOW',
center=(0., 22225-135, sourceHeight),
pitch=np.pi/2,
limPhysX=(-10., 10.),
limPhysY=(17.5, 92.5),
surface='None',
material=filterBe,
thickness=0.25,)
ehWindow = ehWindow._replace(surface='Beryllium window {0:0.0f} $\mu$m'.format(ehWindow.thickness*1e3))
# Sample
sample = namedtuple('sample', ['name', 'center'])
smpl = sample(
name='OP-SMPL',
center=[0, 24000-135, sourceHeight],)
@@ -0,0 +1,27 @@
manip_new_trx:
description: Sample Manipulator X-Translation
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-ES1-MAN:TRX
enabled: true
onFailure: retry
readoutPriority: baseline
softwareTrigger: false
manip_new_try:
description: Sample Manipulator Y-Translation
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-ES1-MAN:TRY
enabled: true
onFailure: retry
readoutPriority: baseline
softwareTrigger: false
manip_new_trz:
description: Sample Manipulator Z - Along beam
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-ES1-MAN:TRZ
enabled: true
onFailure: retry
readoutPriority: baseline
softwareTrigger: false
+71
View File
@@ -0,0 +1,71 @@
sample_manipulator:
- !include ./sample_manipulator.yaml
### Timepix Detector
timepix:
readoutPriority: async
description: ASI Serval Timepix Detector
deviceClass: superxas_bec.devices.timepix.timepix.Timepix
deviceConfig:
prefix: "X10DA-ES-TPX1:"
backend_rest_url: "P6-0008.psi.ch:8452"
hostname: "x10da-bec-001.psi.ch"
enable_xes: false
onFailure: retry
enabled: true
readOnly: false
softwareTrigger: true
### Ionization Chambers
ic1:
readoutPriority: monitored
description: Ionization Chamber 1
deviceClass: ophyd.EpicsSignalRO
deviceConfig:
read_pv: X10DA-ES1-SAI_01:MEAN
auto_monitor: True
onFailure: raise
enabled: True
softwareTrigger: False
ic2:
readoutPriority: monitored
description: Ionization Chamber 2
deviceClass: ophyd.EpicsSignalRO
deviceConfig:
read_pv: X10DA-ES1-SAI_02:MEAN
auto_monitor: True
onFailure: raise
enabled: True
softwareTrigger: False
ic3:
readoutPriority: monitored
description: Ionization Chamber 3
deviceClass: ophyd.EpicsSignalRO
deviceConfig:
read_pv: X10DA-ES1-SAI_03:MEAN
auto_monitor: True
onFailure: raise
enabled: True
softwareTrigger: False
### Monochromator Axis
mono_energy:
description: Axis for the QEXAFS monochromator
deviceClass: ophyd_devices.devices.psi_motor.EpicsMotor
deviceConfig:
prefix: "X10DA-MO12-QEXAFS:E_TEST"
enabled: true
onFailure: retry
readoutPriority: baseline
softwareTrigger: false
### Trigger Card #####
trigger:
readoutPriority: baseline
description: Trigger Card
deviceClass: superxas_bec.devices.trigger.Trigger
deviceConfig:
prefix: 'X10DA-ES1:'
onFailure: raise
enabled: True
softwareTrigger: True
@@ -1,4 +1,3 @@
#######################################
## Beam Monitors 2 and 3 -- Virtual positioners
@@ -0,0 +1,243 @@
###################################
## Frontend Absorber ##
###################################
abs:
readoutPriority: baseline
description: Frontend Absorber
deviceClass: superxas_bec.devices.absorber.Absorber
deviceConfig:
prefix: "X10DA-FE-ABS1:"
onFailure: retry
enabled: true
softwareTrigger: false
###################################
## Frontend Slits ##
###################################
sldi_trxr:
readoutPriority: baseline
description: Front-end slit diaphragm X-translation Ring-edge
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-FE-SLDI:TRXR
onFailure: retry
enabled: true
softwareTrigger: false
sldi_trxw:
readoutPriority: baseline
description: Front-end slit diaphragm X-translation Wall-edge
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-FE-SLDI:TRXW
onFailure: retry
enabled: true
softwareTrigger: false
sldi_tryb:
readoutPriority: baseline
description: Front-end slit diaphragm Y-translation Bottom-edge
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-FE-SLDI:TRYB
onFailure: retry
enabled: true
softwareTrigger: false
sldi_tryt:
readoutPriority: baseline
description: Front-end slit diaphragm X-translation Top-edge
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-FE-SLDI:TRYT
onFailure: retry
enabled: true
softwareTrigger: false
sldi_centerx:
readoutPriority: baseline
description: Front-end slit diaphragm X-center
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-FE-SLDI:CENTERX
onFailure: retry
enabled: true
softwareTrigger: false
sldi_gapx:
readoutPriority: baseline
description: Front-end slit diaphragm X-gap
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-FE-SLDI:GAPX
onFailure: retry
enabled: true
softwareTrigger: false
sldi_centery:
readoutPriority: baseline
description: Front-end slit diaphragm Y-center
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-FE-SLDI:CENTERY
onFailure: retry
enabled: true
softwareTrigger: false
sldi_gapy:
readoutPriority: baseline
description: Front-end slit diaphragm Y-gap
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-FE-SLDI:GAPY
onFailure: retry
enabled: true
softwareTrigger: false
###################################
## Collimating Mirror ##
###################################
cm_trxu:
readoutPriority: baseline
description: Collimating Mirror X-translation upstream
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-FE-CM:TRXU
onFailure: retry
enabled: true
softwareTrigger: false
cm_trxd:
readoutPriority: baseline
description: Collimating Mirror X-translation downstream
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-FE-CM:TRXD
onFailure: retry
enabled: true
softwareTrigger: false
cm_tryu:
readoutPriority: baseline
description: Collimating Mirror Y-translation upstream
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-FE-CM:TRYU
onFailure: retry
enabled: true
softwareTrigger: false
cm_trydr:
readoutPriority: baseline
description: Collimating Mirror Y-translation downstream ring
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-FE-CM:TRYDR
onFailure: retry
enabled: true
softwareTrigger: false
cm_trydw:
readoutPriority: baseline
description: Collimating Mirror Y-translation downstream wall
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-FE-CM:TRYDW
onFailure: retry
enabled: true
softwareTrigger: false
cm_bnd:
readoutPriority: baseline
description: Collimating Mirror bender
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-FE-CM:BND
onFailure: retry
enabled: true
softwareTrigger: false
# cm_bnd_radius:
# readoutPriority: baseline
# description: Collimating Mirror Bending Radius
# deviceClass: ophyd.EpicsSignalRO
# deviceConfig:
# read_pv: X10DA-CPCL-CM:BNDFORCE
# onFailure: retry
# readOnly: true
# enabled: true
# softwareTrigger: false
cm_rotx:
readoutPriority: baseline
description: Collimating Morror Pitch
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-FE-CM:ROTX
onFailure: retry
enabled: true
softwareTrigger: false
cm_roty:
readoutPriority: baseline
description: Collimating Morror Yaw
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-FE-CM:ROTY
onFailure: retry
enabled: true
softwareTrigger: false
cm_rotz:
readoutPriority: baseline
description: Collimating Morror Roll
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-FE-CM:ROTZ
onFailure: retry
enabled: true
softwareTrigger: false
cm_trx:
readoutPriority: baseline
description: Collimating Morror Center Point X
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-FE-CM:XTCP
onFailure: retry
enabled: true
softwareTrigger: false
cm_try:
readoutPriority: baseline
description: Collimating Morror Center Point Y
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-FE-CM:YTCP
onFailure: retry
enabled: true
softwareTrigger: false
cm_ztcp:
readoutPriority: baseline
description: Collimating Morror Center Point Z
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-FE-CM:ZTCP
onFailure: retry
enabled: true
softwareTrigger: false
cm_xstripe:
readoutPriority: baseline
description: Collimating Morror X Stripe
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-FE-CM:XSTRIPE
onFailure: retry
enabled: true
softwareTrigger: false
@@ -0,0 +1,18 @@
###################################
## SLS Machine ##
###################################
curr:
readoutPriority: baseline
description: SLS ring current
deviceClass: ophyd.EpicsSignalRO
deviceConfig:
auto_monitor: true
read_pv: AGEBD-DBPM3CURR:CURRENT-AVG
deviceTags:
- machine
onFailure: buffer
enabled: true
readOnly: true
softwareTrigger: false
@@ -0,0 +1,396 @@
###################################
## Monochromator ##
###################################
mo1_try:
readoutPriority: baseline
description: Monochromator Y Translation
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-OP1-MO1:TRY
onFailure: retry
enabled: true
softwareTrigger: false
mo1_trx:
readoutPriority: baseline
description: Monochromator X Translation
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-OP1-MO1:TRX
onFailure: retry
enabled: true
softwareTrigger: false
###################################
## Optics Slits + Beam Monitor 1 ##
###################################
# sl1_trxr:
# readoutPriority: baseline
# description: Optics slits 1 X-translation Ring-edge
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X10DA-OP-SL1:TRXR
# onFailure: retry
# enabled: true
# softwareTrigger: false
# deviceTags:
# - optics
# - slits
# sl1_trxw:
# readoutPriority: baseline
# description: Optics slits 1 X-translation Wall-edge
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X10DA-OP-SL1:TRXW
# onFailure: retry
# enabled: true
# softwareTrigger: false
# deviceTags:
# - optics
# - slits
# sl1_tryb:
# readoutPriority: baseline
# description: Optics slits 1 Y-translation Bottom-edge
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X10DA-OP-SL1:TRYB
# onFailure: retry
# enabled: true
# softwareTrigger: false
# deviceTags:
# - optics
# - slits
# sl1_tryt:
# readoutPriority: baseline
# description: Optics slits 1 X-translation Top-edge
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X10DA-OP-SL1:TRYT
# onFailure: retry
# enabled: true
# softwareTrigger: false
# deviceTags:
# - optics
# - slits
# bm1_try:
# readoutPriority: baseline
# description: Beam Monitor 1 Y-translation
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X10DA-OP-BM1:TRY
# onFailure: retry
# enabled: true
# softwareTrigger: false
# deviceTags:
# - optics
# - slits
# sl1_centerx:
# readoutPriority: baseline
# description: Optics slits 1 X-center
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X10DA-OP-SL1:CENTERX
# onFailure: retry
# enabled: true
# softwareTrigger: false
# deviceTags:
# - optics
# - slits
# sl1_gapx:
# readoutPriority: baseline
# description: Optics slits 1 X-gap
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X10DA-OP-SL1:GAPX
# onFailure: retry
# enabled: true
# softwareTrigger: false
# deviceTags:
# - optics
# - slits
# sl1_centery:
# readoutPriority: baseline
# description: Optics slits 1 Y-center
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X10DA-OP-SL1:CENTERY
# onFailure: retry
# enabled: true
# softwareTrigger: false
# deviceTags:
# - optics
# - slits
# sl1_gapy:
# readoutPriority: baseline
# description: Optics slits 1 Y-gap
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X10DA-OP-SL1:GAPY
# onFailure: retry
# enabled: true
# softwareTrigger: false
# deviceTags:
# - optics
# - slits
###################################
## Focusing Mirror ##
###################################
# fm_trxu:
# readoutPriority: baseline
# description: Focusing Mirror X-translation upstream
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X10DA-OP-FM:TRXU
# onFailure: retry
# enabled: true
# softwareTrigger: false
# fm_trxd:
# readoutPriority: baseline
# description: Focusing Mirror X-translation downstream
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X10DA-OP-FM:TRXD
# onFailure: retry
# enabled: true
# softwareTrigger: false
# fm_tryd:
# readoutPriority: baseline
# description: Focusing Mirror Y-translation downstream
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X10DA-OP-FM:TRYD
# onFailure: retry
# enabled: true
# softwareTrigger: false
# fm_tryur:
# readoutPriority: baseline
# description: Focusing Mirror Y-translation upstream ring
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X10DA-OP-FM:TRYUR
# onFailure: retry
# enabled: true
# softwareTrigger: false
# fm_tryuw:
# readoutPriority: baseline
# description: Focusing Mirror Y-translation upstream wall
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X10DA-OP-FM:TRYUW
# onFailure: retry
# enabled: true
# softwareTrigger: false
fm_bnd:
readoutPriority: baseline
description: Focusing Mirror bender
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-OP-MI1:TRB
onFailure: retry
enabled: true
softwareTrigger: false
# fm_bnd_radius:
# readoutPriority: baseline
# description: Focusing Mirror Bending Radius
# deviceClass: ophyd.EpicsSignalRO
# deviceConfig:
# read_pv: X10DA-CPCL-FM:BNDFORCE
# onFailure: retry
# readOnly: true
# enabled: true
# softwareTrigger: false
fm_rotx:
readoutPriority: baseline
description: Focusing Morror Pitch
deviceClass: ophyd.EpicsSignalRO
deviceConfig:
auto_monitor: true
read_pv: X10DA-OP-MI1:pitch
onFailure: retry
enabled: true
softwareTrigger: false
fm_roty:
readoutPriority: baseline
description: Focusing Morror Yaw
deviceClass: ophyd.EpicsSignalRO
deviceConfig:
auto_monitor: true
read_pv: X10DA-OP-MI1:yaw
onFailure: retry
enabled: true
softwareTrigger: false
fm_rotz:
readoutPriority: baseline
description: Focusing Morror Roll
deviceClass: ophyd.EpicsSignalRO
deviceConfig:
auto_monitor: true
read_pv: X10DA-OP-MI1:roll
onFailure: retry
enabled: true
softwareTrigger: false
fm_trx:
readoutPriority: baseline
description: Focusing Morror Center Point X
deviceClass: ophyd.EpicsSignalRO
deviceConfig:
auto_monitor: true
read_pv: X10DA-OP-MI1:trans
onFailure: retry
enabled: true
softwareTrigger: false
fm_try:
readoutPriority: baseline
description: Focusing Morror Center Point Y
deviceClass: ophyd.EpicsSignalRO
deviceConfig:
auto_monitor: true
read_pv: X10DA-OP-MI1:y
onFailure: retry
enabled: true
softwareTrigger: false
###################################
## Optics Slits + Beam Monitor 2 ##
###################################
# sl2_trxr:
# readoutPriority: baseline
# description: Optics slits 2 X-translation Ring-edge
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X10DA-OP-SL2:TRXR
# onFailure: retry
# enabled: true
# softwareTrigger: false
# deviceTags:
# - optics
# - slits
# sl2_trxw:
# readoutPriority: baseline
# description: Optics slits 2 X-translation Wall-edge
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X10DA-OP-SL2:TRXW
# onFailure: retry
# enabled: true
# softwareTrigger: false
# deviceTags:
# - optics
# - slits
# sl2_tryb:
# readoutPriority: baseline
# description: Optics slits 2 Y-translation Bottom-edge
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X10DA-OP-SL2:TRYB
# onFailure: retry
# enabled: true
# softwareTrigger: false
# deviceTags:
# - optics
# - slits
# sl2_tryt:
# readoutPriority: baseline
# description: Optics slits 2 X-translation Top-edge
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X10DA-OP-SL2:TRYT
# onFailure: retry
# enabled: true
# softwareTrigger: false
# deviceTags:
# - optics
# - slits
# bm2_try:
# readoutPriority: baseline
# description: Beam Monitor 2 Y-translation
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X10DA-OP-BM2:TRY
# onFailure: retry
# enabled: true
# softwareTrigger: false
# deviceTags:
# - optics
# - slits
# sl2_centerx:
# readoutPriority: baseline
# description: Optics slits 2 X-center
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X10DA-OP-SL2:CENTERX
# onFailure: retry
# enabled: true
# softwareTrigger: false
# deviceTags:
# - optics
# - slits
# sl2_gapx:
# readoutPriority: baseline
# description: Optics slits 2 X-gap
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X10DA-OP-SL2:GAPX
# onFailure: retry
# enabled: true
# softwareTrigger: false
# deviceTags:
# - optics
# - slits
# sl2_centery:
# readoutPriority: baseline
# description: Optics slits 2 Y-center
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X10DA-OP-SL2:CENTERY
# onFailure: retry
# enabled: true
# softwareTrigger: false
# deviceTags:
# - optics
# - slits
# sl2_gapy:
# readoutPriority: baseline
# description: Optics slits 2 Y-gap
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X10DA-OP-SL2:GAPY
# onFailure: retry
# enabled: true
# softwareTrigger: false
# deviceTags:
# - optics
# - slits
@@ -0,0 +1,89 @@
###################################
## General ##
###################################
## SLS Machine
machine_config:
- !include ./x10da_machine.yaml
## Beam Monitors OP + EH
# beam_monitors_config:
# - !include ./x10da_beam_monitors.yaml
###################################
## Frontend ##
###################################
## Frontend
frontend_config:
- !include ./x10da_frontend.yaml
###################################
## Optics Hutch ##
###################################
## Bragg Monochromator
mo1_gonio:
readoutPriority: baseline
description: Monochromator ROTX Goniometer
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-OP1-MO1:ROTX
onFailure: retry
enabled: true
softwareTrigger: false
mo1_bragg:
readoutPriority: baseline
description: Positioner for the Monochromator
deviceClass: superxas_bec.devices.mo1_bragg.mo1_bragg.Mo1Bragg
deviceConfig:
prefix: "X10DA-OP-MO1:BRAGG:"
onFailure: retry
enabled: true
softwareTrigger: false
# mo1_bragg_angle:
# readoutPriority: baseline
# description: Positioner for the Monochromator
# deviceClass: superxas_bec.devices.mo1_bragg.mo1_bragg_angle.Mo1BraggAngle
# deviceConfig:
# prefix: "X10DA-OP-MO1:BRAGG:"
# onFailure: retry
# enabled: true
# softwareTrigger: false
# Remaining optics hutch
optics_config:
- !include ./x10da_optics.yaml
###################################
## Experimental Hutch ##
###################################
# ## NIDAQ
nidaq:
readoutPriority: monitored
description: NIDAQ backend for data reading for superxas scans
deviceClass: superxas_bec.devices.nidaq.nidaq.Nidaq
deviceConfig:
prefix: "X10DA-CPCL-SCANSERVER:"
onFailure: retry
enabled: true
softwareTrigger: false
## XAS (ICx, SDD, ref foils)
# xas_config:
# - !include ./x10da_xas.yaml
## XRD (Pilatus, pinhole, beamstop)
#xrd_config:
# - !include ./x10da_xrd.yaml
# Commented out because too slow
## Hutch cameras
# hutch_cams:
# - !include ./x10da_hutch_cameras.yaml
## Remaining experimental hutch
# es_config:
# - !include ./x10da_experimental_hutch.yaml
+72
View File
@@ -0,0 +1,72 @@
"""Frontend Absorber"""
from __future__ import annotations
import enum
from typing import TYPE_CHECKING
from ophyd import Component as Cpt
from ophyd import EpicsSignal, EpicsSignalRO
from ophyd_devices import CompareStatus, DeviceStatus
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
if TYPE_CHECKING:
from bec_lib.devicemanager import ScanInfo
class AbsorberError(Exception):
"""Absorber specific exception"""
class STATUS(int, enum.Enum):
"""Absorber States"""
MOVING_CLOSE = 0
OPEN = 1
MOVING_OPEN = 2
CLOSED = 3
NOT_ENABLED = 4
TIMEOUT_CLOSE = 5
TIMEOUT_OPEN = 6
CLOSE_LS_LOST = 7
OPEN_LS_LOST = 8
CLOSE_LS_NOT_FREE = 9
OPEN_LS_NOT_FREE = 10
ERROR_LS = 11
TO_CONNECT = 12
MAN_OPEN = 13
UNDEFINED = 14
class Absorber(PSIDeviceBase):
"""Class for the Frontend Absorber"""
USER_ACCESS = ["open", "close"]
request = Cpt(EpicsSignal, suffix="REQUEST", kind="config", doc="Open/Close Absorber")
status = Cpt(EpicsSignalRO, suffix="STATUS", kind="config", doc="Absorber Status")
status_string = Cpt(EpicsSignalRO, suffix="STATUS", kind="config", string=True, doc="Absorber Status")
def __init__(self, *, name: str, prefix: str = "", scan_info: ScanInfo | None = None, **kwargs):
super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs)
self.timeout_for_move = 10
# Wait for connection on all components, ensure IOC is connected
self.wait_for_connection(all_signals=True, timeout=5)
def open(self) -> DeviceStatus | None:
"""Open the Absorber"""
if self.status.get() == STATUS.CLOSED:
self.request.put(1)
status_open = CompareStatus(self.status, STATUS.OPEN, timeout=self.timeout_for_move)
status = status_open
return status
else:
return None
def close(self) -> DeviceStatus | None:
"""Close the Absorber"""
if self.status.get() == STATUS.OPEN:
self.request.put(1)
status_close = CompareStatus(self.status, STATUS.CLOSED, timeout=self.timeout_for_move)
status = status_close
return status
else:
return None
+466
View File
@@ -0,0 +1,466 @@
"""Module for the Mo1 Bragg positioner of the SuperXAS beamline.
The softIOC is reachable via the EPICS prefix X10DA-OP-MO1:BRAGG: and connected
to a motor controller via web sockets. The Mo1 Bragg positioner is a scan controller
to setup XAS scans. A few scan modes are programmed in the controller, e.g. simple and advanced XAS scans.
Note: For some of the Epics PVs, in particular action buttons, the put_complete=True is
used to ensure that the action is executed completely. This is believed
to allow for a more stable execution of the action."""
import time
from typing import Any, Literal
from bec_lib.devicemanager import ScanInfo
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd import DeviceStatus, Signal, StatusBase
from ophyd.status import SubscriptionStatus, WaitTimeoutError
from ophyd_devices import CompareStatus, ProgressSignal, TransitionStatus
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from ophyd_devices.utils.errors import DeviceStopError
from pydantic import BaseModel, Field
from typeguard import typechecked
from superxas_bec.devices.mo1_bragg.mo1_bragg_devices import Mo1BraggPositioner
# pylint: disable=unused-import
from superxas_bec.devices.mo1_bragg.mo1_bragg_enums import (
MoveType,
ScanControlLoadMessage,
ScanControlMode,
ScanControlScanStatus,
TriggerControlMode,
TriggerControlSource,
)
from superxas_bec.devices.mo1_bragg.mo1_bragg_utils import compute_spline
# Initialise logger
logger = bec_logger.logger
########### Exceptions ###########
class Mo1BraggError(Exception):
"""Exception for the Mo1 Bragg positioner"""
########## Scan Parameter Model ##########
class ScanParameter(BaseModel):
"""Dataclass to store the scan parameters for the Mo1 Bragg positioner.
This needs to be in sync with the kwargs of the MO1 Bragg scans from SuperXAS, to
ensure that the scan parameters are correctly set. Any changes in the scan kwargs,
i.e. renaming or adding new parameters, need to be represented here as well."""
scan_time: float | None = Field(None, description="Scan time for a half oscillation")
scan_duration: float | None = Field(None, description="Duration of the scan")
xrd_enable_low: bool | None = Field(
None, description="XRD enabled for low, should be PV trig_ena_lo_enum"
) # trig_enable_low: bool = None
xrd_enable_high: bool | None = Field(
None, description="XRD enabled for high, should be PV trig_ena_hi_enum"
) # trig_enable_high: bool = None
exp_time_low: float | None = Field(None, description="Exposure time low energy/angle")
exp_time_high: float | None = Field(None, description="Exposure time high energy/angle")
cycle_low: int | None = Field(None, description="Cycle for low energy/angle")
cycle_high: int | None = Field(None, description="Cycle for high energy/angle")
start: float | None = Field(None, description="Start value for energy/angle")
stop: float | None = Field(None, description="Stop value for energy/angle")
p_kink: float | None = Field(None, description="P Kink")
e_kink: float | None = Field(None, description="Energy Kink")
model_config: dict = {"validate_assignment": True}
########### Mo1 Bragg Motor Class ###########
class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
"""Mo1 Bragg motor for the SuperXAS beamline.
The prefix to connect to the soft IOC is X10DA-OP-MO1:BRAGG:
"""
progress_signal = Cpt(ProgressSignal, name="progress_signal")
USER_ACCESS = ["set_advanced_xas_settings", "set_xtal"]
def __init__(self, name: str, prefix: str = "", scan_info: ScanInfo | None = None, **kwargs): # type: ignore
"""
Initialize the PSI Device Base class.
Args:
name (str) : Name of the device
scan_info (ScanInfo): The scan info to use.
"""
super().__init__(name=name, scan_info=scan_info, prefix=prefix, **kwargs)
self.scan_parameter = ScanParameter()
self.timeout_for_pvwait = 7.5
########################################
# Beamline Specific Implementations #
########################################
def on_init(self) -> None:
"""
Called when the device is initialized.
No signals are connected at this point. If you like to
set default values on signals, please use on_connected instead.
"""
def on_connected(self) -> None:
"""
Called after the device is connected and its signals are connected.
Default values for signals should be set here.
"""
self.scan_control.scan_progress.subscribe(self._progress_update, run=False)
def on_stage(self) -> DeviceStatus | StatusBase | None:
"""
Called while staging the device.
Information about the upcoming scan can be accessed from the scan_info (self.scan_info.msg) object.
"""
if self.scan_control.scan_msg.get() != ScanControlLoadMessage.PENDING:
status = CompareStatus(self.scan_control.scan_msg, ScanControlLoadMessage.PENDING)
self.cancel_on_stop(status)
self.scan_control.scan_val_reset.put(1)
status.wait(timeout=self.timeout_for_pvwait)
scan_name = self.scan_info.msg.scan_name
self._update_scan_parameter()
if scan_name == "xas_simple_scan":
self.set_xas_settings(
low=self.scan_parameter.start,
high=self.scan_parameter.stop,
scan_time=self.scan_parameter.scan_time,
)
self.set_trig_settings(
enable_low=False,
enable_high=False,
exp_time_low=0,
exp_time_high=0,
cycle_low=0,
cycle_high=0,
)
self.set_scan_control_settings(
mode=ScanControlMode.SIMPLE, scan_duration=self.scan_parameter.scan_duration
)
elif scan_name == "xas_simple_scan_with_xrd":
self.set_xas_settings(
low=self.scan_parameter.start,
high=self.scan_parameter.stop,
scan_time=self.scan_parameter.scan_time,
)
self.set_trig_settings(
enable_low=self.scan_parameter.xrd_enable_low, # enable_low=self.scan_parameter.trig_enable_low,
enable_high=self.scan_parameter.xrd_enable_high, # enable_high=self.scan_parameter.trig_enable_high,
exp_time_low=self.scan_parameter.exp_time_low,
exp_time_high=self.scan_parameter.exp_time_high,
cycle_low=self.scan_parameter.cycle_low,
cycle_high=self.scan_parameter.cycle_high,
)
self.set_scan_control_settings(
mode=ScanControlMode.SIMPLE, scan_duration=self.scan_parameter.scan_duration
)
elif scan_name == "xas_advanced_scan":
self.set_advanced_xas_settings(
low=self.scan_parameter.start,
high=self.scan_parameter.stop,
scan_time=self.scan_parameter.scan_time,
p_kink=self.scan_parameter.p_kink,
e_kink=self.scan_parameter.e_kink,
)
self.set_trig_settings(
enable_low=False,
enable_high=False,
exp_time_low=0,
exp_time_high=0,
cycle_low=0,
cycle_high=0,
)
self.set_scan_control_settings(
mode=ScanControlMode.ADVANCED, scan_duration=self.scan_parameter.scan_duration
)
elif scan_name == "xas_advanced_scan_with_xrd":
self.set_advanced_xas_settings(
low=self.scan_parameter.start,
high=self.scan_parameter.stop,
scan_time=self.scan_parameter.scan_time,
p_kink=self.scan_parameter.p_kink,
e_kink=self.scan_parameter.e_kink,
)
self.set_trig_settings(
enable_low=self.scan_parameter.xrd_enable_low, # enable_low=self.scan_parameter.trig_enable_low,
enable_high=self.scan_parameter.xrd_enable_high, # enable_high=self.scan_parameter.trig_enable_high,
exp_time_low=self.scan_parameter.exp_time_low,
exp_time_high=self.scan_parameter.exp_time_high,
cycle_low=self.scan_parameter.cycle_low,
cycle_high=self.scan_parameter.cycle_high,
)
self.set_scan_control_settings(
mode=ScanControlMode.ADVANCED, scan_duration=self.scan_parameter.scan_duration
)
else:
return
# Setting scan duration seems to lag behind slightly in the backend, include small sleep
# logger.info(f"Sleeping for one second")
# time.sleep(1)
# logger.info(f"Device {self.name}, done sleeping")
# Load the scan parameters to the controller
status = CompareStatus(self.scan_control.scan_msg, ScanControlLoadMessage.SUCCESS)
self.cancel_on_stop(status)
self.scan_control.scan_load.put(1)
# Wait for params to be checked from controller
status.wait(self.timeout_for_pvwait)
return None
def on_unstage(self) -> DeviceStatus | StatusBase | None:
"""Called while unstaging the device."""
if self.stopped is True:
logger.warning(f"Resetting stopped in unstage for device {self.name}.")
self._stopped = False
if self.scan_control.scan_msg.get() in [
ScanControlLoadMessage.STARTED,
ScanControlLoadMessage.SUCCESS,
]:
status = CompareStatus(self.scan_control.scan_msg, ScanControlLoadMessage.PENDING)
self.cancel_on_stop(status)
try:
status.wait(2)
return None
except WaitTimeoutError:
logger.warning(
f"Timeout in on_unstage of {self.name} after {self.timeout_for_pvwait}s, current scan_control_message : {self.scan_control.scan_msg.get()}"
)
status = CompareStatus(self.scan_control.scan_msg, ScanControlLoadMessage.PENDING)
self.cancel_on_stop(status)
self.scan_control.scan_val_reset.put(1)
status.wait(timeout=self.timeout_for_pvwait)
else:
status = CompareStatus(self.scan_control.scan_msg, ScanControlLoadMessage.PENDING)
self.cancel_on_stop(status)
self.scan_control.scan_val_reset.put(1)
status.wait(timeout=self.timeout_for_pvwait)
return None
def on_pre_scan(self) -> DeviceStatus | StatusBase | None:
"""Called right before the scan starts on all devices automatically."""
def on_trigger(self) -> DeviceStatus | StatusBase | None:
"""Called when the device is triggered."""
def on_complete(self) -> DeviceStatus | StatusBase | None:
"""Called to inquire if a device has completed a scans."""
status = CompareStatus(self.scan_control.scan_done, 1)
self.cancel_on_stop(status)
return status
def on_kickoff(self) -> DeviceStatus | StatusBase | None:
"""Called to kickoff a device for a fly scan. Has to be called explicitly."""
scan_duration = self.scan_control.scan_duration.get()
# TODO implement better logic for infinite scans, at least bring it up with SuperXAS
start_func = (
self.scan_control.scan_start_infinite.put
if scan_duration < 0.1
else self.scan_control.scan_start_timer.put
)
status = TransitionStatus(
self.scan_control.scan_status,
transitions=[ScanControlScanStatus.READY, ScanControlScanStatus.RUNNING],
strict=True,
failure_states=[ScanControlScanStatus.PARAMETER_WRONG],
)
self.cancel_on_stop(status)
start_func(1)
return status
def on_stop(self) -> None:
"""Called when the device is stopped."""
self.stopped = True # Needs to be set to stop motion
######### Utility Methods #########
def _progress_update(self, value, **kwargs) -> None:
"""Callback method to update the scan progress, runs a callback
to SUB_PROGRESS subscribers, i.e. BEC.
Args:
value (int) : current progress value
"""
max_value = 100
self.progress_signal.put(value=value, max_value=max_value, done=bool(max_value == value))
def set_xas_settings(self, low: float, high: float, scan_time: float) -> None:
"""Set XAS parameters for upcoming scan.
Args:
low (float): Low energy/angle value of the scan
high (float): High energy/angle value of the scan
scan_time (float): Time for a half oscillation
"""
status_list = []
status_list.append(self.scan_settings.s_scan_energy_lo.set(low))
self.cancel_on_stop(status_list[-1])
status_list.append(self.scan_settings.s_scan_energy_hi.set(high))
self.cancel_on_stop(status_list[-1])
status_list.append(self.scan_settings.s_scan_scantime.set(scan_time))
self.cancel_on_stop(status_list[-1])
for s in status_list:
s.wait(timeout=self.timeout_for_pvwait)
@typechecked
def convert_angle_energy(
self, mode: Literal["AngleToEnergy", "EnergyToAngle"], inp: float
) -> float:
"""Calculate energy to angle or vice versa
Args:
mode (Literal["AngleToEnergy", "EnergyToAngle"]): Mode of calculation
input (float): Either angle or energy
Returns:
output (float): Converted angle or energy
"""
self.calculator.calc_reset.put(0)
self.calculator.calc_reset.put(1)
status = CompareStatus(self.calculator.calc_done, 0)
self.cancel_on_stop(status)
status.wait(self.timeout_for_pvwait)
if mode == "AngleToEnergy":
self.calculator.calc_angle.put(inp)
elif mode == "EnergyToAngle":
self.calculator.calc_energy.put(inp)
status = CompareStatus(self.calculator.calc_done, 1)
self.cancel_on_stop(status)
status.wait(self.timeout_for_pvwait)
time.sleep(0.25) #TODO needed still? Needed due to update frequency of softIOC
if mode == "AngleToEnergy":
return self.calculator.calc_energy.get()
elif mode == "EnergyToAngle":
return self.calculator.calc_angle.get()
def set_advanced_xas_settings(
self, low: float, high: float, scan_time: float, p_kink: float, e_kink: float
) -> None:
"""Set Advanced XAS parameters for upcoming scan.
Args:
low (float): Low angle value of the scan in eV
high (float): High angle value of the scan in eV
scan_time (float): Time for a half oscillation in s
p_kink (float): Position of kink in %
e_kink (float): Energy of kink in eV
"""
e_kink_deg = self.convert_angle_energy(mode="EnergyToAngle", inp=e_kink)
# Angle and Energy are inverse proportional!
high_deg = self.convert_angle_energy(mode="EnergyToAngle", inp=low)
low_deg = self.convert_angle_energy(mode="EnergyToAngle", inp=high)
pos, vel, dt = compute_spline(
low_deg=low_deg,
high_deg=high_deg,
p_kink=p_kink,
e_kink_deg=e_kink_deg,
scan_time=scan_time,
)
status_list = []
status_list.append(self.scan_settings.a_scan_pos.set(pos))
self.cancel_on_stop(status_list[-1])
status_list.append(self.scan_settings.a_scan_vel.set(vel))
self.cancel_on_stop(status_list[-1])
status_list.append(self.scan_settings.a_scan_time.set(dt))
self.cancel_on_stop(status_list[-1])
for s in status_list:
s.wait(timeout=self.timeout_for_pvwait)
def set_trig_settings(
self,
enable_low: bool,
enable_high: bool,
exp_time_low: int,
exp_time_high: int,
cycle_low: int,
cycle_high: int,
) -> None:
"""Set TRIG settings for the upcoming scan.
Args:
enable_low (bool): Enable TRIG for low energy/angle
enable_high (bool): Enable TRIG for high energy/angle
num_trigger_low (int): Number of triggers for low energy/angle
num_trigger_high (int): Number of triggers for high energy/angle
exp_time_low (int): Exposure time for low energy/angle
exp_time_high (int): Exposure time for high energy/angle
cycle_low (int): Cycle for low energy/angle
cycle_high (int): Cycle for high energy/angle
"""
status_list = []
status_list.append(self.scan_settings.trig_ena_hi_enum.set(int(enable_high)))
self.cancel_on_stop(status_list[-1])
status_list.append(self.scan_settings.trig_ena_lo_enum.set(int(enable_low)))
self.cancel_on_stop(status_list[-1])
status_list.append(self.scan_settings.trig_time_hi.set(exp_time_high))
self.cancel_on_stop(status_list[-1])
status_list.append(self.scan_settings.trig_time_lo.set(exp_time_low))
self.cancel_on_stop(status_list[-1])
status_list.append(self.scan_settings.trig_every_n_hi.set(cycle_high))
self.cancel_on_stop(status_list[-1])
status_list.append(self.scan_settings.trig_every_n_lo.set(cycle_low))
self.cancel_on_stop(status_list[-1])
for s in status_list:
s.wait(timeout=self.timeout_for_pvwait)
def set_scan_control_settings(self, mode: ScanControlMode, scan_duration: float) -> None:
"""Set the scan control settings for the upcoming scan.
Args:
mode (ScanControlMode): Mode for the scan, either simple or advanced
scan_duration (float): Duration of the scan
"""
val = ScanControlMode(mode).value
status_list = []
status_list.append(self.scan_control.scan_mode_enum.set(val))
self.cancel_on_stop(status_list[-1])
status_list.append(self.scan_control.scan_duration.set(scan_duration))
self.cancel_on_stop(status_list[-1])
for s in status_list:
s.wait(timeout=self.timeout_for_pvwait)
def _update_scan_parameter(self):
"""Get the scan_info parameters for the scan."""
for key, value in self.scan_info.msg.request_inputs["inputs"].items():
if hasattr(self.scan_parameter, key):
setattr(self.scan_parameter, key, value)
for key, value in self.scan_info.msg.request_inputs["kwargs"].items():
if hasattr(self.scan_parameter, key):
setattr(self.scan_parameter, key, value)
@@ -0,0 +1,20 @@
"""Positioner implementation with readback angle of the MO1 Bragg positioner."""
from ophyd import Component as Cpt
from ophyd import EpicsSignalRO, EpicsSignalWithRBV
from superxas_bec.devices.mo1_bragg.mo1_bragg_devices import Mo1BraggPositioner
class Mo1BraggAngle(Mo1BraggPositioner):
"""Positioner implementation with readback angle of the MO1 Bragg positioner."""
readback = Cpt(EpicsSignalRO, suffix="feedback_pos_angle_RBV", kind="normal", auto_monitor=True)
setpoint = Cpt(EpicsSignalWithRBV, suffix="set_abs_pos_angle", kind="normal", auto_monitor=True)
low_lim = Cpt(EpicsSignalRO, suffix="lo_lim_pos_angle_RBV", kind="config", auto_monitor=True)
high_lim = Cpt(EpicsSignalRO, suffix="hi_lim_pos_angle_RBV", kind="config", auto_monitor=True)
@property
def egu(self) -> str:
"""Return the engineering unit of the positioner."""
return "deg"
@@ -0,0 +1,407 @@
"""Module for the Mo1 Bragg positioner"""
import threading
import time
import traceback
from typing import Literal
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd import (
Device,
DeviceStatus,
EpicsSignal,
EpicsSignalRO,
EpicsSignalWithRBV,
PositionerBase,
Signal,
)
from ophyd.utils import LimitError
from superxas_bec.devices.mo1_bragg.mo1_bragg_enums import MoveType
# Initialise logger
logger = bec_logger.logger
############# Exceptions #############
class Mo1BraggStoppedError(Exception):
"""Exception to raise when the Bragg positioner is stopped."""
############# Signal classes #############
class MoveTypeSignal(Signal):
"""Custom Signal to set the move type of the Bragg positioner"""
# pylint: disable=arguments-differ
def set(self, value: str | MoveType) -> None:
"""Returns currently active move method
Args:
value (str | MoveType) : Can be either 'energy' or 'angle'
"""
value = MoveType(value.lower())
self._readback = value.value
############# Utility devices to separate the namespace #############
class Mo1BraggStatus(Device):
"""Mo1 Bragg PVs for status monitoring"""
error_status = Cpt(EpicsSignalRO, suffix="error_status_RBV", kind="config", auto_monitor=True)
brake_enabled = Cpt(EpicsSignalRO, suffix="brake_enabled_RBV", kind="config", auto_monitor=True)
mot_commutated = Cpt(
EpicsSignalRO, suffix="mot_commutated_RBV", kind="config", auto_monitor=True
)
axis_enabled = Cpt(EpicsSignalRO, suffix="axis_enabled_RBV", kind="config", auto_monitor=True)
heartbeat = Cpt(EpicsSignalRO, suffix="heartbeat_RBV", kind="config", auto_monitor=True)
class Mo1BraggCrystal(Device):
"""Mo1 Bragg PVs to set the crystal parameters"""
bragg_off_si111 = Cpt(EpicsSignalWithRBV, suffix="bragg_off_si111", kind="config")
bragg_off_si311 = Cpt(EpicsSignalWithRBV, suffix="bragg_off_si311", kind="config")
xtal_enum = Cpt(EpicsSignalWithRBV, suffix="xtal_ENUM", kind="config")
d_spacing_si111 = Cpt(EpicsSignalWithRBV, suffix="d_spacing_si111", kind="config")
d_spacing_si311 = Cpt(EpicsSignalWithRBV, suffix="d_spacing_si311", kind="config")
set_offset = Cpt(EpicsSignal, suffix="set_offset", kind="config", put_complete=True)
current_d_spacing = Cpt(
EpicsSignalRO, suffix="current_d_spacing_RBV", kind="normal", auto_monitor=True
)
current_bragg_off = Cpt(
EpicsSignalRO, suffix="current_bragg_off_RBV", kind="normal", auto_monitor=True
)
current_xtal = Cpt(
EpicsSignalRO, suffix="current_xtal_ENUM_RBV", kind="normal", auto_monitor=True
)
current_xtal_string = Cpt(
EpicsSignalRO, suffix="current_xtal_ENUM_RBV", kind="normal", auto_monitor=True, string=True
)
class Mo1BraggScanSettings(Device):
"""Mo1 Bragg PVs to set the scan setttings"""
# TRIG settings
trig_select_ref_enum = Cpt(EpicsSignalWithRBV, suffix="trig_select_ref_ENUM", kind="config")
trig_ena_hi_enum = Cpt(EpicsSignalWithRBV, suffix="trig_ena_hi_ENUM", kind="config")
trig_time_hi = Cpt(EpicsSignalWithRBV, suffix="trig_time_hi", kind="config")
trig_every_n_hi = Cpt(EpicsSignalWithRBV, suffix="trig_every_n_hi", kind="config")
trig_ena_lo_enum = Cpt(EpicsSignalWithRBV, suffix="trig_ena_lo_ENUM", kind="config")
trig_time_lo = Cpt(EpicsSignalWithRBV, suffix="trig_time_lo", kind="config")
trig_every_n_lo = Cpt(EpicsSignalWithRBV, suffix="trig_every_n_lo", kind="config")
# XAS simple scan settings
s_scan_angle_hi = Cpt(EpicsSignalWithRBV, suffix="s_scan_angle_hi", kind="config")
s_scan_angle_lo = Cpt(EpicsSignalWithRBV, suffix="s_scan_angle_lo", kind="config")
s_scan_energy_lo = Cpt(
EpicsSignalWithRBV, suffix="s_scan_energy_lo", kind="config", auto_monitor=True
)
s_scan_energy_hi = Cpt(
EpicsSignalWithRBV, suffix="s_scan_energy_hi", kind="config", auto_monitor=True
)
s_scan_scantime = Cpt(
EpicsSignalWithRBV, suffix="s_scan_scantime", kind="config", auto_monitor=True
)
# XAS advanced scan settings
a_scan_pos = Cpt(EpicsSignalWithRBV, suffix="a_scan_pos", kind="config", auto_monitor=True)
a_scan_vel = Cpt(EpicsSignalWithRBV, suffix="a_scan_vel", kind="config", auto_monitor=True)
a_scan_time = Cpt(EpicsSignalWithRBV, suffix="a_scan_time", kind="config", auto_monitor=True)
class Mo1TriggerSettings(Device):
"""Mo1 Trigger settings"""
settle_time = Cpt(EpicsSignalWithRBV, suffix="settle_time", kind="config")
max_dev = Cpt(EpicsSignalWithRBV, suffix="max_dev", kind="config")
xrd_trig_src_enum = Cpt(EpicsSignalWithRBV, suffix="xrd_trig_src_ENUM", kind="config")
xrd_trig_mode_enum = Cpt(EpicsSignalWithRBV, suffix="xrd_trig_mode_ENUM", kind="config")
xrd_trig_len = Cpt(EpicsSignalWithRBV, suffix="xrd_trig_len", kind="config")
xrd_trig_req = Cpt(EpicsSignal, suffix="xrd_trig_req", kind="config")
falcon_trig_src_enum = Cpt(EpicsSignalWithRBV, suffix="falcon_trig_src_ENUM", kind="config")
falcon_trig_mode_enum = Cpt(EpicsSignalWithRBV, suffix="falcon_trig_mode_ENUM", kind="config")
falcon_trig_len = Cpt(EpicsSignalWithRBV, suffix="falcon_trig_len", kind="config")
falcon_trig_req = Cpt(EpicsSignal, suffix="falcon_trig_req", kind="config")
univ1_trig_src_enum = Cpt(EpicsSignalWithRBV, suffix="univ1_trig_src_ENUM", kind="config")
univ1_trig_mode_enum = Cpt(EpicsSignalWithRBV, suffix="univ1_trig_mode_ENUM", kind="config")
univ1_trig_len = Cpt(EpicsSignalWithRBV, suffix="univ1_trig_len", kind="config")
univ1_trig_req = Cpt(EpicsSignal, suffix="univ1_trig_req", kind="config")
univ2_trig_src_enum = Cpt(EpicsSignalWithRBV, suffix="univ2_trig_src_ENUM", kind="config")
univ2_trig_mode_enum = Cpt(EpicsSignalWithRBV, suffix="univ2_trig_mode_ENUM", kind="config")
univ2_trig_len = Cpt(EpicsSignalWithRBV, suffix="univ2_trig_len", kind="config")
univ2_trig_req = Cpt(EpicsSignal, suffix="univ2_trig_req", kind="config")
class Mo1BraggCalculator(Device):
"""Mo1 Bragg PVs to convert angle to energy or vice-versa."""
calc_reset = Cpt(EpicsSignal, suffix="calc_reset", kind="config", put_complete=True)
calc_done = Cpt(EpicsSignalRO, suffix="calc_done_RBV", kind="config")
calc_energy = Cpt(EpicsSignalWithRBV, suffix="calc_energy", kind="config")
calc_angle = Cpt(EpicsSignalWithRBV, suffix="calc_angle", kind="config")
class Mo1BraggScanControl(Device):
"""Mo1 Bragg PVs to control the scan after setting the parameters."""
scan_mode_enum = Cpt(EpicsSignalWithRBV, suffix="scan_mode_ENUM", kind="config")
scan_duration = Cpt(
EpicsSignalWithRBV, suffix="scan_duration", kind="config", auto_monitor=True
)
scan_load = Cpt(EpicsSignal, suffix="scan_load", kind="config", put_complete=True)
scan_msg = Cpt(EpicsSignalRO, suffix="scan_msg_ENUM_RBV", kind="config", auto_monitor=True)
scan_start_infinite = Cpt(
EpicsSignal, suffix="scan_start_infinite", kind="config", put_complete=True
)
scan_start_timer = Cpt(EpicsSignal, suffix="scan_start_timer", kind="config", put_complete=True)
scan_stop = Cpt(EpicsSignal, suffix="scan_stop", kind="config", put_complete=True)
scan_status = Cpt(
EpicsSignalRO, suffix="scan_status_ENUM_RBV", kind="config", auto_monitor=True
)
scan_time_left = Cpt(
EpicsSignalRO, suffix="scan_time_left_RBV", kind="config", auto_monitor=True
)
scan_done = Cpt(EpicsSignalRO, suffix="scan_done_RBV", kind="config", auto_monitor=True)
scan_val_reset = Cpt(EpicsSignal, suffix="scan_val_reset", kind="config", put_complete=True)
scan_progress = Cpt(EpicsSignalRO, suffix="scan_progress_RBV", kind="config", auto_monitor=True)
scan_spectra_done = Cpt(
EpicsSignalRO, suffix="scan_n_osc_RBV", kind="config", auto_monitor=True
)
scan_spectra_left = Cpt(
EpicsSignalRO, suffix="scan_n_osc_left_RBV", kind="config", auto_monitor=True
)
class Mo1BraggPositioner(Device, PositionerBase):
"""
Positioner implementation with readback energy of the MO1 Bragg positioner.
The prefix to connect to the soft IOC is X10DA-OP-MO1:BRAGG:
This soft IOC connects to the NI motor and its control loop.
"""
USER_ACCESS = ["set_xtal"]
####### Sub-components ########
# Namespace is cleaner and easier to maintain
crystal = Cpt(Mo1BraggCrystal, "")
scan_settings = Cpt(Mo1BraggScanSettings, "")
trigger_settings = Cpt(Mo1TriggerSettings, "")
calculator = Cpt(Mo1BraggCalculator, "")
scan_control = Cpt(Mo1BraggScanControl, "")
status = Cpt(Mo1BraggStatus, "")
############# Energy PVs #############
readback = Cpt(
EpicsSignalRO, suffix="feedback_pos_energy_RBV", kind="hinted", auto_monitor=True
)
setpoint = Cpt(
EpicsSignalWithRBV, suffix="set_abs_pos_energy", kind="normal", auto_monitor=True
)
motor_is_moving = Cpt(
EpicsSignalRO, suffix="move_abs_done_RBV", kind="normal", auto_monitor=True
)
low_lim = Cpt(EpicsSignalRO, suffix="lo_lim_pos_energy_RBV", kind="config", auto_monitor=True)
high_lim = Cpt(EpicsSignalRO, suffix="hi_lim_pos_energy_RBV", kind="config", auto_monitor=True)
velocity = Cpt(EpicsSignalWithRBV, suffix="move_velocity", kind="config", auto_monitor=True)
########## Move Command PVs ##########
move_abs = Cpt(EpicsSignal, suffix="move_abs", kind="config", put_complete=True)
move_stop = Cpt(EpicsSignal, suffix="move_stop", kind="config", put_complete=True)
SUB_READBACK = "readback"
_default_sub = SUB_READBACK
SUB_PROGRESS = "progress"
def __init__(self, prefix="", *, name: str, **kwargs):
"""Initialize the Mo1 Bragg positioner.
Args:
prefix (str): EPICS prefix for the device
name (str): Name of the device
kwargs: Additional keyword arguments
"""
super().__init__(prefix, name=name, **kwargs)
self._move_thread = None
self._stopped = False
self.readback.name = self.name
def stop(self, *, success=False) -> None:
"""Stop any motion on the positioner
Args:
success (bool) : Flag to indicate if the motion was successful
"""
self.move_stop.put(1)
self._stopped = True
super().stop(success=success)
def stop_scan(self) -> None:
"""Stop the currently running scan gracefully, this finishes the running oscillation."""
self.scan_control.scan_stop.put(1)
@property
def stopped(self) -> bool:
"""Return the status of the positioner"""
return self._stopped
######### Positioner specific methods #########
@property
def limits(self) -> tuple:
"""Return limits of the Bragg positioner"""
return (self.low_lim.get(), self.high_lim.get())
@property
def low_limit(self) -> float:
"""Return low limit of axis"""
return self.limits[0]
@property
def high_limit(self) -> float:
"""Return high limit of axis"""
return self.limits[1]
@property
def egu(self) -> str:
"""Return the engineering units of the positioner"""
return "eV"
@property
def position(self) -> float:
"""Return the current position of Mo1Bragg, considering the move type"""
return self.readback.get()
# pylint: disable=arguments-differ
def check_value(self, value: float) -> None:
"""Method to check if a value is within limits of the positioner.
Called by PositionerBase.move()
Args:
value (float) : value to move axis to.
"""
low_limit, high_limit = self.limits
if low_limit < high_limit and not low_limit <= value <= high_limit:
raise LimitError(f"position={value} not within limits {self.limits}")
def _move_and_finish(
self, target_pos: float, status: DeviceStatus, update_frequency: float = 0.1
) -> None:
"""
Method to be called in the move thread to move the Bragg positioner
to the target position.
Args:
target_pos (float) : target position for the motion
move_cpt (Cpt) : component to set the target position on the IOC,
either setpoint or setpoint_abs_angle depending
on the move type
read_cpt (Cpt) : component to read the current position of the motion,
readback or feedback_pos_angle
status (DeviceStatus) : status object to set the status of the motion
update_frequency (float): Optional, frequency to update the current position of
the motion, defaults to 0.1s
"""
motor_name = None
try:
# Set the target position on IOC
self.setpoint.put(target_pos)
self.move_abs.put(1)
# Currently sleep is needed due to delay in updates on PVs, maybe time can be reduced
time.sleep(0.5)
motor_name = self.name
while self.motor_is_moving.get() == 0:
if self.stopped:
raise Mo1BraggStoppedError(f"Device {self.name} was stopped")
time.sleep(update_frequency)
# pylint: disable=protected-access
status.set_finished()
# pylint: disable=broad-except
except Exception as exc:
content = traceback.format_exc()
logger.error(
f"Error in move thread of device {motor_name if motor_name else ''}: {content}"
)
status.set_exception(exc=exc)
def move(self, value: float, **kwargs) -> DeviceStatus:
"""
Move the Bragg positioner to the specified value, allows to
switch between move types angle and energy.
Args:
value (float) : target value for the motion
move_type (str | MoveType) : Optional, specify the type of move,
either 'energy' or 'angle'
Returns:
DeviceStatus : status object to track the motion
"""
self._stopped = False
self.check_value(value)
status = DeviceStatus(device=self)
self._move_thread = threading.Thread(
target=self._move_and_finish, args=(value, status, 0.1)
)
self._move_thread.start()
return status
# -------------- End of Positioner specific methods -----------------#
# -------------- MO1 Bragg specific methods -----------------#
def set_xtal(
self,
xtal_enum: Literal["111", "311"],
bragg_off_si111: float = None,
bragg_off_si311: float = None,
d_spacing_si111: float = None,
d_spacing_si311: float = None,
) -> None:
"""Method to set the crystal parameters of the Bragg positioner
Args:
xtal_enum (Literal["111", "311"]) : Enum to set the crystal orientation
bragg_off_si111 (float) : Offset for the 111 crystal
bragg_off_si311 (float) : Offset for the 311 crystal
d_spacing_si111 (float) : d-spacing for the 111 crystal
d_spacing_si311 (float) : d-spacing for the 311 crystal
"""
if bragg_off_si111 is not None:
self.crystal.bragg_off_si111.put(bragg_off_si111)
if bragg_off_si311 is not None:
self.crystal.bragg_off_si311.put(bragg_off_si311)
if d_spacing_si111 is not None:
self.crystal.d_spacing_si111.put(d_spacing_si111)
if d_spacing_si311 is not None:
self.crystal.d_spacing_si311.put(d_spacing_si311)
if xtal_enum == "111":
crystal_set = 0
elif xtal_enum == "311":
crystal_set = 1
else:
raise ValueError(
f"Invalid argument for xtal_enum : {xtal_enum}, choose from '111' or '311'"
)
self.crystal.xtal_enum.put(crystal_set)
self.crystal.set_offset.put(1)
@@ -0,0 +1,61 @@
"""Enums for the Bragg positioner and trigger generator"""
import enum
class TriggerControlSource(int, enum.Enum):
"""Enum class for the trigger control source of the trigger generator"""
EPICS = 0
INPOS = 1
class TriggerControlMode(int, enum.Enum):
"""Enum class for the trigger control mode of the trigger generator"""
PULSE = 0
CONDITION = 1
class ScanControlScanStatus(int, enum.Enum):
"""Enum class for the scan status of the Bragg positioner"""
PARAMETER_WRONG = 0
VALIDATION_PENDING = 1
READY = 2
RUNNING = 3
class ScanControlLoadMessage(int, enum.Enum):
"""Enum for validating messages for load message of the Bragg positioner"""
PENDING = 0
STARTED = 1
SUCCESS = 2
ERR_TRIG_MEAS_LEN_LOW = 3
ERR_TRIG_N_TRIGGERS_LOW = 4
ERR_TRIG_TRIGS_EVERY_N_LOW = 5
ERR_TRIG_MEAS_LEN_HI = 6
ERR_TRIG_N_TRIGGERS_HI = 7
ERR_TRIG_TRIGS_EVERY_N_HI = 8
ERR_SCAN_HI_ANGLE_LIMIT = 9
ERR_SCAN_LOW_ANGLE_LIMITS = 10
ERR_SCAN_TIME = 11
ERR_SCAN_VEL_TOO_HI = 12
ERR_SCAN_ANGLE_OUT_OF_LIM = 13
ERR_SCAN_HIGH_VEL_LAR_42 = 14
ERR_SCAN_MODE_INVALID = 15
class MoveType(str, enum.Enum):
"""Enum class to switch between move types energy and angle for the Bragg positioner"""
ENERGY = "energy"
ANGLE = "angle"
class ScanControlMode(int, enum.Enum):
"""Enum class for the scan control mode of the Bragg positioner"""
SIMPLE = 0
ADVANCED = 1
@@ -0,0 +1,93 @@
"""Module for additional utils of the Mo1 Bragg Positioner"""
import numpy as np
from scipy.interpolate import BSpline
################ Define Constants ############
SAFETY_FACTOR = 0.025 # safety factor to limit acceleration -> NEVER SET TO ZERO !
N_SAMPLES = 41 # number of samples to generate -> Always choose uneven number,
# otherwise peak value will not be included
DEGREE_SPLINE = 3 # DEGREE_SPLINE of spline, 3 works good
TIME_COMPENSATE_SPLINE = 0.0062 # time to be compensated each spline in s
POSITION_COMPONSATION = 0.02 # angle to add at both limits, must be same values
# as used on ACS controller for simple scans
class Mo1UtilsSplineError(Exception):
"""Exception for spline computation"""
def compute_spline(
low_deg: float, high_deg: float, p_kink: float, e_kink_deg: float, scan_time: float
) -> tuple[float, float, float]:
"""Spline computation for the advanced scan mode
Args:
low_deg (float): Low angle value of the scan in deg
high_deg (float): High angle value of the scan in deg
scan_time (float): Time for a half oscillation in s
p_kink (float): Position of kink in %
e_kink_deg (float): Position of kink in degree
Returns:
tuple[float,float,float] : Position, Velocity and delta T arrays for the spline
"""
# increase motion range slightly so that xas trigger signals will occur at defined energy limits
low_deg = low_deg - POSITION_COMPONSATION
high_deg = high_deg + POSITION_COMPONSATION
if not (0 <= p_kink <= 100):
raise Mo1UtilsSplineError(
"Kink position not within range of [0..100%]" + f"for p_kink: {p_kink}"
)
if not (low_deg < e_kink_deg < high_deg):
raise Mo1UtilsSplineError(
"Kink energy not within selected energy range of scan,"
+ f"for e_kink_deg {e_kink_deg}, low_deg {low_deg} and"
+ f"high_deg {high_deg}."
)
tc1 = SAFETY_FACTOR / scan_time * TIME_COMPENSATE_SPLINE
t_kink = (scan_time - TIME_COMPENSATE_SPLINE - 2 * (SAFETY_FACTOR - tc1)) * p_kink / 100 + (
SAFETY_FACTOR - tc1
)
t_input = [
0,
SAFETY_FACTOR - tc1,
t_kink,
scan_time - TIME_COMPENSATE_SPLINE - SAFETY_FACTOR + tc1,
scan_time - TIME_COMPENSATE_SPLINE,
]
p_input = [0, 0, e_kink_deg - low_deg, high_deg - low_deg, high_deg - low_deg]
cv = np.stack((t_input, p_input)).T # spline coefficients
max_param = len(cv) - DEGREE_SPLINE
kv = np.clip(np.arange(len(cv) + DEGREE_SPLINE + 1) - DEGREE_SPLINE, 0, max_param) # knots
spl = BSpline(kv, cv, DEGREE_SPLINE) # get spline function
p = spl(np.linspace(0, max_param, N_SAMPLES))
v = spl(np.linspace(0, max_param, N_SAMPLES), 1)
a = spl(np.linspace(0, max_param, N_SAMPLES), 2)
j = spl(np.linspace(0, max_param, N_SAMPLES), 3)
tim, pos = p.T
pos = pos + low_deg
vel = v[:, 1] / v[:, 0]
acc = []
for item in a:
acc.append(0) if item[1] == 0 else acc.append(item[1] / item[0])
jerk = []
for item in j:
jerk.append(0) if item[1] == 0 else jerk.append(item[1] / item[0])
dt = np.zeros(len(tim))
for i in np.arange(len(tim)):
if i == 0:
dt[i] = 0
else:
dt[i] = 1000 * (tim[i] - tim[i - 1])
return pos, vel, dt
+706
View File
@@ -0,0 +1,706 @@
from __future__ import annotations
import time
from typing import TYPE_CHECKING, Literal, cast
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd import Device, DeviceStatus, EpicsSignal, EpicsSignalRO, Kind, StatusBase
from ophyd.status import SubscriptionStatus, WaitTimeoutError
from ophyd_devices import CompareStatus, ProgressSignal, TransitionStatus
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from ophyd_devices.sim.sim_signals import SetableSignal
from superxas_bec.devices.nidaq.nidaq_enums import (
EncoderFactors,
NIDAQCompression,
NidaqState,
ReadoutRange,
ScanRates,
ScanType,
)
if TYPE_CHECKING: # pragma: no cover
from bec_lib.devicemanager import ScanInfo
logger = bec_logger.logger
class NidaqError(Exception):
"""Nidaq specific error"""
class NidaqControl(Device):
"""Nidaq control class with all PVs"""
energy = Cpt(SetableSignal, value=0, kind=Kind.normal)
smpl_abs = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream sample absorption"
)
ref_abs = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream reference absorption"
)
cisum = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter sum"
)
ai0_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 0, MEAN"
)
ai1_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 1, MEAN"
)
ai2_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 2, MEAN"
)
ai3_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 3, MEAN"
)
ai4_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 4, MEAN"
)
ai5_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 5, MEAN"
)
ai6_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 6, MEAN"
)
ai7_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 7, MEAN"
)
di0_max = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream digital input 0, MAX")
di1_max = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream digital input 1, MAX")
di2_max = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream digital input 2, MAX")
di3_max = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream digital input 3, MAX")
di4_max = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream digital input 4, MAX")
ci0_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 0, MEAN"
)
ci1_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 1, MEAN"
)
ci2_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 2, MEAN"
)
ci3_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 3, MEAN"
)
ci4_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 4, MEAN"
)
ci5_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 5, MEAN"
)
ci6_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 6, MEAN"
)
ci7_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 7, MEAN"
)
ci8_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 8, MEAN"
)
ci9_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 9, MEAN"
)
### Readback PVs for EpicsEmitter ###
ai0 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-AI0",
kind=Kind.normal,
doc="EPICS analog input 0",
auto_monitor=True,
)
ai1 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-AI1",
kind=Kind.normal,
doc="EPICS analog input 1",
auto_monitor=True,
)
ai2 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-AI2",
kind=Kind.normal,
doc="EPICS analog input 2",
auto_monitor=True,
)
ai3 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-AI3",
kind=Kind.normal,
doc="EPICS analog input 3",
auto_monitor=True,
)
ai4 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-AI4",
kind=Kind.normal,
doc="EPICS analog input 4",
auto_monitor=True,
)
ai5 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-AI5",
kind=Kind.normal,
doc="EPICS analog input 5",
auto_monitor=True,
)
ai6 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-AI6",
kind=Kind.normal,
doc="EPICS analog input 6",
auto_monitor=True,
)
ai7 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-AI7",
kind=Kind.normal,
doc="EPICS analog input 7",
auto_monitor=True,
)
ci0 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-CI0",
kind=Kind.normal,
doc="EPICS counter input 0",
auto_monitor=True,
)
ci1 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-CI1",
kind=Kind.normal,
doc="EPICS counter input 1",
auto_monitor=True,
)
ci2 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-CI2",
kind=Kind.normal,
doc="EPICS counter input 2",
auto_monitor=True,
)
ci3 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-CI3",
kind=Kind.normal,
doc="EPICS counter input 3",
auto_monitor=True,
)
ci4 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-CI4",
kind=Kind.normal,
doc="EPICS counter input 4",
auto_monitor=True,
)
ci5 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-CI5",
kind=Kind.normal,
doc="EPICS counter input 5",
auto_monitor=True,
)
ci6 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-CI6",
kind=Kind.normal,
doc="EPICS counter input 6",
auto_monitor=True,
)
ci7 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-CI7",
kind=Kind.normal,
doc="EPICS counter input 7",
auto_monitor=True,
)
ci8 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-CI8",
kind=Kind.normal,
doc="EPICS counter input 8",
auto_monitor=True,
)
ci9 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-CI9",
kind=Kind.normal,
doc="EPICS counter input 9",
auto_monitor=True,
)
di0 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-DI0",
kind=Kind.normal,
doc="EPICS digital input 0",
auto_monitor=True,
)
di1 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-DI1",
kind=Kind.normal,
doc="EPICS digital input 1",
auto_monitor=True,
)
di2 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-DI2",
kind=Kind.normal,
doc="EPICS digital input 2",
auto_monitor=True,
)
di3 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-DI3",
kind=Kind.normal,
doc="EPICS digital input 3",
auto_monitor=True,
)
di4 = Cpt(
EpicsSignalRO,
suffix="NIDAQ-DI4",
kind=Kind.normal,
doc="EPICS digital input 4",
auto_monitor=True,
)
enc_epics = Cpt(
EpicsSignalRO,
suffix="NIDAQ-ENC",
kind=Kind.normal,
doc="EPICS Encoder reading",
auto_monitor=True,
)
energy_epics = Cpt(
EpicsSignalRO,
suffix="NIDAQ-ENERGY",
kind=Kind.normal,
doc="EPICS Energy reading",
auto_monitor=True,
)
### Readback for BEC emitter ###
ai0_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 0, STD"
)
ai1_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 1, STD"
)
ai2_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 2, STD"
)
ai3_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 3, STD"
)
ai4_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 4, STD"
)
ai5_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 5, STD"
)
ai6_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 6, STD"
)
ai7_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 7, STD"
)
ci0_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 0. STD"
)
ci1_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 1. STD"
)
ci2_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 2. STD"
)
ci3_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 3. STD"
)
ci4_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 4. STD"
)
ci5_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 5. STD"
)
ci6_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 6. STD"
)
ci7_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 7. STD"
)
ci8_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 8. STD"
)
ci9_std_dev = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter input 9. STD"
)
xas_timestamp = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream XAS timestamp")
# xrd_timestamp = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream XRD timestamp")
# xrd_angle = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream XRD angle")
# xrd_energy = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream XRD energy")
# xrd_ai0_mean = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream XRD ai0 mean")
# xrd_ai0_std_dev = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream XRD ai0 std dev")
enc = Cpt(SetableSignal, value=0, kind=Kind.normal)
rle = Cpt(SetableSignal, value=0, kind=Kind.normal)
### Control PVs ###
enable_compression = Cpt(EpicsSignal, suffix="NIDAQ-EnableRLE", kind=Kind.config, auto_monitor=True)
# enable_dead_time_correction = Cpt(EpicsSignal, suffix="NIDAQ-EnableDTC", kind=Kind.config, auto_monitor=True)
kickoff_call = Cpt(EpicsSignal, suffix="NIDAQ-Kickoff", kind=Kind.config)
stage_call = Cpt(EpicsSignal, suffix="NIDAQ-Stage", kind=Kind.config)
state = Cpt(EpicsSignal, suffix="NIDAQ-FSMState", kind=Kind.config, auto_monitor=True)
server_status = Cpt(EpicsSignalRO, suffix="NIDAQ-ServerStatus", kind=Kind.config)
compression_ratio = Cpt(EpicsSignalRO, suffix="NIDAQ-CompressionRatio", kind=Kind.config)
scan_type = Cpt(EpicsSignal, suffix="NIDAQ-ScanType", kind=Kind.config)
scan_type_string = Cpt(EpicsSignal, suffix="NIDAQ-ScanType", kind=Kind.config, string=True)
sampling_rate = Cpt(EpicsSignal, suffix="NIDAQ-SamplingRateRequested", kind=Kind.config, auto_monitor=True)
sampling_rate_string = Cpt(EpicsSignal, suffix="NIDAQ-SamplingRateRequested", kind=Kind.config, string=True, auto_monitor=True)
scan_duration = Cpt(EpicsSignal, suffix="NIDAQ-SamplingDuration", kind=Kind.config)
readout_range = Cpt(EpicsSignal, suffix="NIDAQ-ReadoutRange", kind=Kind.config, auto_monitor=True)
readout_range_string = Cpt(EpicsSignal, suffix="NIDAQ-ReadoutRange", kind=Kind.config, string=True, auto_monitor=True)
encoder_factor = Cpt(EpicsSignal, suffix="NIDAQ-EncoderFactor", kind=Kind.config, auto_monitor=True)
encoder_factor_string = Cpt(EpicsSignal, suffix="NIDAQ-EncoderFactor", kind=Kind.config, string=True, auto_monitor=True)
stop_call = Cpt(EpicsSignal, suffix="NIDAQ-Stop", kind=Kind.config)
power = Cpt(EpicsSignal, suffix="NIDAQ-Power", kind=Kind.config)
heartbeat = Cpt(EpicsSignal, suffix="NIDAQ-Heartbeat", kind=Kind.config, auto_monitor=True)
time_left = Cpt(EpicsSignalRO, suffix="NIDAQ-TimeLeft", kind=Kind.config, auto_monitor=True)
ai_chans = Cpt(EpicsSignal, suffix="NIDAQ-AIChans", kind=Kind.config)
ci_chans = Cpt(EpicsSignal, suffix="NIDAQ-CIChans", kind=Kind.config)
di_chans = Cpt(EpicsSignal, suffix="NIDAQ-DIChans", kind=Kind.config)
add_chans = Cpt(EpicsSignal, suffix="NIDAQ-AddChans", kind=Kind.config, auto_monitor=True)
smpl_abs_ln = Cpt(EpicsSignal, suffix="NIDAQ-smpl_abs_ln", kind=Kind.config, auto_monitor=True)
ref_abs_ln = Cpt(EpicsSignal, suffix="NIDAQ-ref_abs_ln", kind=Kind.config, auto_monitor=True)
smpl_abs_no = Cpt(EpicsSignal, suffix="NIDAQ-smpl_abs_no", kind=Kind.config, auto_monitor=True)
smpl_abs_no_string = Cpt(EpicsSignal, suffix="NIDAQ-smpl_abs_no", kind=Kind.config, string=True, auto_monitor=True)
smpl_abs_de = Cpt(EpicsSignal, suffix="NIDAQ-smpl_abs_de", kind=Kind.config, auto_monitor=True)
smpl_abs_de_string = Cpt(EpicsSignal, suffix="NIDAQ-smpl_abs_de", kind=Kind.config, string=True, auto_monitor=True)
ref_abs_no = Cpt(EpicsSignal, suffix="NIDAQ-ref_abs_no", kind=Kind.config, auto_monitor=True)
ref_abs_no_string = Cpt(EpicsSignal, suffix="NIDAQ-ref_abs_no", kind=Kind.config, string=True, auto_monitor=True)
ref_abs_de = Cpt(EpicsSignal, suffix="NIDAQ-ref_abs_de", kind=Kind.config, auto_monitor=True)
ref_abs_de_string = Cpt(EpicsSignal, suffix="NIDAQ-ref_abs_de", kind=Kind.config, string=True, auto_monitor=True)
class Nidaq(PSIDeviceBase, NidaqControl):
"""NIDAQ ophyd wrapper around the NIDAQ backend currently running at x10da-nidaq-01
Args:
prefix (str) : Prefix to the NIDAQ soft ioc, currently X10DA-CPCL-SCANSERVER:
name (str) : Name of the device
scan_info (ScanInfo) : ScanInfo object passed by BEC's devicemanager.
"""
progress_signal = Cpt(ProgressSignal, name="progress_signal")
USER_ACCESS = ["set_config"]
def __init__(self, prefix: str = "", *, name: str, scan_info: ScanInfo = None, **kwargs):
super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs)
self.scan_info: ScanInfo
self.timeout_wait_for_signal = 5 # put 5s firsts
self._timeout_wait_for_pv = 5 # 5s timeout for pv calls. editted due to timeout issues persisting
self.valid_scan_names = [
"xas_simple_scan",
"xas_simple_scan_with_xrd",
"xas_advanced_scan",
"xas_advanced_scan_with_xrd",
"nidaq_continuous_scan",
]
########################################
# Beamline Methods #
########################################
def _check_if_scan_name_is_valid(self) -> bool:
"""Check if the scan is within the list of scans for which the backend is working"""
scan_name = self.scan_info.msg.scan_name
if scan_name in self.valid_scan_names:
return True
return False
def set_config(
self,
sampling_rate: Literal[
100000, 500000, 1000000, 2000000, 4000000, 5000000, 10000000, 14286000
],
ai: list,
ci: list,
di: list,
scan_type: Literal["continuous", "triggered"] = "triggered",
scan_duration: float = 0,
readout_range: Literal[1, 2, 5, 10] = 10,
encoder_type: Literal["X_1", "X_2", "X_4"] = "X_4",
enable_compression: bool = True,
) -> None:
"""Method to configure the NIDAQ
Args:
sampling_rate(Literal[100000, 500000, 1000000, 2000000, 4000000, 5000000,
10000000, 14286000]): Sampling rate in Hz
ai(list): List of analog input channel numbers to add, i.e. [0, 1, 2] for
input 0, 1 and 2
ci(list): List of counter input channel numbers to add, i.e. [0, 1, 2] for
input 0, 1 and 2
di(list): List of digital input channel numbers to add, i.e. [0, 1, 2] for
input 0, 1 and 2
scan_type(Literal['continuous', 'triggered']): Triggered to use with monochromator,
otherwise continuous, default 'triggered'
scan_duration(float): Scan duration in seconds, use 0 for infinite scan, default 0
readout_range(Literal[1, 2, 5, 10]): Readout range in +- Volts, default +-10V
encoder_type(Literal['X_1', 'X_2', 'X_4']): Encoder readout type, default 'X_4'
enable_compression(bool): Enable or disable compression of data, default True
"""
if sampling_rate == 100000:
self.sampling_rate.put(ScanRates.HUNDRED_KHZ)
elif sampling_rate == 500000:
self.sampling_rate.put(ScanRates.FIVE_HUNDRED_KHZ)
elif sampling_rate == 1000000:
self.sampling_rate.put(ScanRates.ONE_MHZ)
elif sampling_rate == 2000000:
self.sampling_rate.put(ScanRates.TWO_MHZ)
elif sampling_rate == 4000000:
self.sampling_rate.put(ScanRates.FOUR_MHZ)
elif sampling_rate == 5000000:
self.sampling_rate.put(ScanRates.FIVE_MHZ)
elif sampling_rate == 10000000:
self.sampling_rate.put(ScanRates.TEN_MHZ)
elif sampling_rate == 14286000:
self.sampling_rate.put(ScanRates.FOURTEEN_THREE_MHZ)
ai_chans = 0
if isinstance(ai, list):
for ch in ai:
if isinstance(ch, int):
if ch >= 0 and ch <= 7:
ai_chans = ai_chans | (1 << ch)
self.ai_chans.put(ai_chans)
ci_chans = 0
if isinstance(ci, list):
for ch in ci:
if isinstance(ch, int):
if ch >= 0 and ch <= 7:
ci_chans = ci_chans | (1 << ch)
self.ci_chans.put(ci_chans)
di_chans = 0
if isinstance(di, list):
for ch in di:
if isinstance(ch, int):
if ch >= 0 and ch <= 4:
di_chans = di_chans | (1 << ch)
self.di_chans.put(di_chans)
if scan_type in "continuous":
self.scan_type.put(ScanType.CONTINUOUS)
elif scan_type in "triggered":
self.scan_type.put(ScanType.TRIGGERED)
if scan_duration >= 0:
self.scan_duration.put(scan_duration)
if readout_range == 1:
self.readout_range.put(ReadoutRange.ONE_V)
elif readout_range == 2:
self.readout_range.put(ReadoutRange.TWO_V)
elif readout_range == 5:
self.readout_range.put(ReadoutRange.FIVE_V)
elif readout_range == 10:
self.readout_range.put(ReadoutRange.TEN_V)
if encoder_type in "1/16":
self.encoder_factor.put(EncoderFactors.X1_16)
elif encoder_type in "1/8":
self.encoder_factor.put(EncoderFactors.X1_8)
elif encoder_type in "1/4":
self.encoder_factor.put(EncoderFactors.X1_4)
elif encoder_type in "1/2":
self.encoder_factor.put(EncoderFactors.X1_2)
elif encoder_type in "1":
self.encoder_factor.put(EncoderFactors.X1)
elif encoder_type in "2":
self.encoder_factor.put(EncoderFactors.X2)
elif encoder_type in "4":
self.encoder_factor.put(EncoderFactors.X4)
if enable_compression is True:
self.enable_compression.put(NIDAQCompression.ON)
elif enable_compression is False:
self.enable_compression.put(NIDAQCompression.OFF)
########################################
# Beamline Specific Implementations #
########################################
def on_init(self) -> None:
"""
Called when the device is initialized.
No signals are connected at this point. If you like to
set default values on signals, please use on_connected instead.
"""
def on_connected(self) -> None:
"""
Called after the device is connected and its signals are connected.
Default values for signals should be set here.
"""
status = TransitionStatus(self.heartbeat, transitions=[0, 1], strict=False)
self.cancel_on_stop(status)
try:
status.wait(timeout=self.timeout_wait_for_signal) # Raises if timeout is reached
except WaitTimeoutError:
logger.warning(f"Device {self.name} was not alive, trying to put power on")
status = TransitionStatus(self.heartbeat, transitions=[0, 1], strict=False)
self.cancel_on_stop(status)
self.power.put(1)
status.wait(timeout=self.timeout_wait_for_signal)
status = CompareStatus(self.state, NidaqState.STANDBY)
self.cancel_on_stop(status)
status.wait(timeout=self.timeout_wait_for_signal)
self.scan_duration.set(0).wait(timeout=self._timeout_wait_for_pv)
self.time_left.subscribe(self._progress_update, run=False)
def on_stage(self) -> DeviceStatus | StatusBase | None:
"""
Called while staging the device.
Information about the upcoming scan can be accessed from the scan_info (self.scan_info.msg) object.
If the upcoming scan is not in the list of valid scans, return immediately.
"""
if not self._check_if_scan_name_is_valid():
return None
if self.state.get() != NidaqState.STANDBY:
status = CompareStatus(self.state, NidaqState.STANDBY)
self.cancel_on_stop(status)
self.on_stop()
status.wait(timeout=self.timeout_wait_for_signal)
# If scan is not part of the valid_scan_names,
if self.scan_info.msg.scan_name != "nidaq_continuous_scan":
self.scan_type.set(ScanType.TRIGGERED).wait(timeout=self._timeout_wait_for_pv)
self.scan_duration.set(0).wait(timeout=self._timeout_wait_for_pv)
self.enable_compression.set(1).wait(timeout=self._timeout_wait_for_pv)
else:
self.scan_type.set(ScanType.CONTINUOUS).wait(timeout=self._timeout_wait_for_pv)
self.scan_duration.set(self.scan_info.msg.scan_parameters["scan_duration"]).wait(
timeout=self._timeout_wait_for_pv
)
self.enable_compression.set(self.scan_info.msg.scan_parameters["compression"]).wait(
timeout=self._timeout_wait_for_pv
)
# Stage call to IOC
status = CompareStatus(self.state, NidaqState.STAGE)
self.cancel_on_stop(status)
# TODO 11.11.25/HS64
# Switched from set to put in the hope to get rid of the rare event where nidaq is stopped at the start of a scan
# Problems consistently persisting, testing changing back to set, unconvinced this is the actual cause 14.11.25/AHC
# self.stage_call.set(1).wait(timeout=self._timeout_wait_for_pv)
self.stage_call.put(1)
status.wait(timeout=self.timeout_wait_for_signal)
if self.scan_info.msg.scan_name != "nidaq_continuous_scan":
status = self.on_kickoff()
self.cancel_on_stop(status)
status.wait(timeout=self._timeout_wait_for_pv)
logger.info(f"Device {self.name} was staged: {NidaqState(self.state.get())}")
def on_kickoff(self) -> DeviceStatus | StatusBase:
"""Kickoff the Nidaq"""
status = self.kickoff_call.set(1)
self.cancel_on_stop(status)
return status
def on_unstage(self) -> DeviceStatus | StatusBase | None:
"""Called while unstaging the device. Check that the Nidaq goes into Standby"""
status = CompareStatus(self.state, NidaqState.STANDBY)
self.cancel_on_stop(status)
status.wait(timeout=self.timeout_wait_for_signal)
status = self.enable_compression.set(1)
self.cancel_on_stop(status)
status.wait(self._timeout_wait_for_pv)
logger.info(f"Device {self.name} was unstaged: {NidaqState(self.state.get())}")
def on_pre_scan(self) -> DeviceStatus | StatusBase | None:
"""
Called right before the scan starts on all devices automatically.
Here we ensure that the NIDAQ master task is running
before the motor starts its oscillation. This is needed for being properly homed.
The NIDAQ should go into Acquiring mode.
"""
if not self._check_if_scan_name_is_valid():
return None
if self.scan_info.msg.scan_name == "nidaq_continuous_scan":
logger.info(f"Device {self.name} ready to be kicked off for nidaq_continuous_scan")
return None
status = CompareStatus(self.state, NidaqState.KICKOFF)
self.cancel_on_stop(status)
status.wait(timeout=self._timeout_wait_for_pv)
logger.info(
f"Device {self.name} ready to take data after pre_scan: {NidaqState(self.state.get())}"
)
def on_trigger(self) -> DeviceStatus | StatusBase | None:
"""Called when the device is triggered."""
def on_complete(self) -> DeviceStatus | StatusBase | None:
"""
Called to inquire if a device has completed a scans.
For the NIDAQ we use this method to stop the backend since it
would not stop by itself in its current implementation since the number of points are not predefined.
"""
if not self._check_if_scan_name_is_valid():
return None
status = CompareStatus(self.state, NidaqState.STANDBY)
self.cancel_on_stop(status)
if self.scan_info.msg.scan_name != "nidaq_continuous_scan":
self.on_stop()
return status
def _progress_update(self, value, **kwargs) -> None:
"""Callback method to update the scan progress, runs a callback
to SUB_PROGRESS subscribers, i.e. BEC.
Args:
value (int) : current progress value
"""
scan_duration = self.scan_info.msg.scan_parameters.get("scan_duration", None)
if not isinstance(scan_duration, (int, float)):
return
value = scan_duration - value
max_value = scan_duration
self.progress_signal.put(value=value, max_value=max_value, done=bool(max_value == value))
def on_stop(self) -> None:
"""Called when the device is stopped."""
self.stop_call.put(1)
+60
View File
@@ -0,0 +1,60 @@
import enum
class NIDAQCompression(str, enum.Enum):
"""Options for Compression"""
OFF = 0
ON = 1
class ScanType(int, enum.Enum):
"""Triggering options of the backend"""
TRIGGERED = 0
CONTINUOUS = 1
class NidaqState(int, enum.Enum):
"""Possible States of the NIDAQ backend"""
DISABLED = 0
STANDBY = 1
STAGE = 2
KICKOFF = 3
ACQUIRE = 4
UNSTAGE = 5
class ScanRates(int, enum.Enum):
"""Sampling Rate options for the backend, in kHZ and MHz"""
HUNDRED_KHZ = 0
FIVE_HUNDRED_KHZ = 1
ONE_MHZ = 2
TWO_MHZ = 3
FOUR_MHZ = 4
FIVE_MHZ = 5
TEN_MHZ = 6
FOURTEEN_THREE_MHZ = 7
class ReadoutRange(int, enum.Enum):
"""ReadoutRange in +-V"""
ONE_V = 0
TWO_V = 1
FIVE_V = 2
TEN_V = 3
class EncoderFactors(int, enum.Enum):
"""Encoder Factors"""
X1_16 = 0
X1_8 = 1
X1_4 = 2
X1_2 = 3
X1 = 4
X2 = 5
X4 = 6
@@ -0,0 +1,13 @@
{"type": "PixelMap",
"chips": [
{"i": 0, "p": [0, 1], "f": [0.5, 0.5]},
{"i": 0, "p": [1, 2], "f": [0.5, 0.5]},
{"i": 0, "p": [1, 2], "f": [0.5, 0.5]},
{"i": 0, "p": [0, 1], "f": [0.5, 0.5]},
{"i": 0, "p": [1, 2], "f": [0.5, 0.5]},
{"i": 0, "p": [1, 2], "f": [0.5, 0.5]},
{"i": 0, "p": [0, 1], "f": [0.5, 0.5]},
{"i": 0, "p": [1, 2], "f": [0.5, 0.5]}
]
}
@@ -0,0 +1,26 @@
"""Utilities for building Timepix pixel-map payloads in Python."""
from __future__ import annotations
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_interface import PixelMap
DEFAULT_TIMEPIX_CHIPS = 8
DEFAULT_PIXELS_PER_CHIP = 256 * 256
def create_single_energy_per_chip_pixel_map(
num_chips: int = DEFAULT_TIMEPIX_CHIPS,
pixels_per_chip: int = DEFAULT_PIXELS_PER_CHIP,
) -> PixelMap:
"""Create a pixel map where each chip maps fully to its own energy bin."""
if num_chips <= 0:
raise ValueError("num_chips must be a positive integer.")
if pixels_per_chip <= 0:
raise ValueError("pixels_per_chip must be a positive integer.")
return PixelMap(
chips=[
[{"i": pixel_index, "p": [chip_index], "f": [1.0]} for pixel_index in range(pixels_per_chip)]
for chip_index in range(num_chips)
]
)
+926
View File
@@ -0,0 +1,926 @@
"""
TimePix Detector class for interfacing with the TimePix detector. The timepix_signals module
implements the HTTP communication to the REST API for the tpx3app app. The implementation
of the backend is stored in the timepix_fly_client module. This is combined with the control
interface in EPICS, which is implemented via the 'ASItpxCam' class.
"""
from __future__ import annotations
import enum
import os
import threading
import time
import traceback
from typing import TYPE_CHECKING, Any, Literal
import numpy as np
from bec_lib.file_utils import get_full_path
from bec_lib.logger import bec_logger
from ophyd import ADBase
from ophyd import Component as Cpt
from ophyd import EpicsSignalRO, EpicsSignalWithRBV
from ophyd_devices import (
AsyncSignal,
CompareStatus,
DeviceStatus,
FileEventSignal,
PreviewSignal,
StatusBase,
TransitionStatus,
)
from ophyd_devices.devices.areadetector.cam import ASItpxCam
from ophyd_devices.devices.areadetector.plugins import HDF5Plugin_V35, ImagePlugin_V35
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from typeguard import typechecked
from superxas_bec.devices.timepix.pixel_map_utils import create_single_energy_per_chip_pixel_map
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_backend import TimepixFlyBackend
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_client import TimePixFlyStatus
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_interface import (
OtherConfigModel,
PixelMap,
)
if TYPE_CHECKING:
from bec_lib.messages import ScanStatusMessage
logger = bec_logger.logger
# pylint: disable=redefined-outer-name
class TDCEdge(int, enum.Enum):
"""TDC Edge enum options for TimePix detector."""
RISING = 0
FALLING = 1
BOTH = 2
class TDCOuput(int, enum.Enum):
"""TDC Output enum options for TimePix detector."""
ALL_CHANNELS = 0
CHANNEL_0 = 1
CHANNEL_1 = 2
CHANNEL_2 = 3
CHANNEL_3 = 4
class ACQUIRESTATUS(int, enum.Enum):
"""Acquire status enum options for TimePix detector."""
DONE = 0
ACQUIRING = 1 # or CAPTURING
class DETECTORSTATE(int, enum.Enum):
"""Detector state enum options for TimePix detector."""
IDLE = 0
ACQUIRE = 1
READOUT = 2
CORRECT = 3
SAVING = 4
ABORTING = 5
ERROR = 6
WAITING = 7
INITIALIZING = 8
DISCONNECTED = 9
ABORTED = 10
class TRIGGERMODE(int, enum.Enum):
"""Trigger mode enum options for TimePix detector."""
INTERNAL = 0
EXTERNAL = 1
SOFTWARE = 2
class TRIGGERSOURCE(int, enum.Enum):
"""Trigger source enum options for TimePix detector."""
HDMI1_1 = 0
HDMI1_2 = 1
HDMI1_3 = 2
HDMI2_1 = 3
HDMI2_2 = 4
HDMI2_3 = 5
class EXPOSUREMODE(int, enum.Enum):
"""Exposure mode enum options for TimePix detector."""
TIMED = 0
TRIGGER_WIDTH = 1
class DATASOURCE(int, enum.Enum):
"""Data source for AD Epics backend for Timepix."""
NONE = 0
PREVIEW = 1
IMAGE = 2
class FILEWRITEMODE(int, enum.Enum):
"""HDF5 Plugin FileWrite Mode"""
SINGLE = 0
CAPTURE = 1
STREAM = 2
def load_pixel_map_from_json(file_path: str) -> PixelMap:
"""Load a pixel map from a JSON file.
Args:
file_path (str): Path to the JSON file containing the pixel map.
Returns:
PixelMap: The loaded pixel map.
"""
# Check if path exists
if not os.path.exists(file_path):
raise FileNotFoundError(f"Pixel map file not found: {file_path}")
try:
with open(file_path, "r", encoding="utf-8") as file:
pixel_map_str = file.read()
pixel_map = PixelMap.model_validate_json(pixel_map_str)
except Exception as exc:
raise ValueError(f"Failed to load pixel map from {file_path}: {exc}") from exc
return pixel_map
class ImagePlugin_Timepix(ImagePlugin_V35):
"""Custom Image Plugin for TimePix detector."""
unique_id = Cpt(EpicsSignalRO, "UniqueId_RBV", auto_monitor=True)
class HDF5Plugin_Timepix(HDF5Plugin_V35):
"""Custom HDF5 Plugin for TimePix detector."""
capture = Cpt(EpicsSignalWithRBV, "Capture", auto_monitor=True)
write_file = Cpt(EpicsSignalWithRBV, "WriteFile", auto_monitor=True)
# pylint: disable=too-many-instance-attributes, too-many-arguments, too-many-locals
class TimePixControl(ADBase):
"""Interface for the TimePix EPICS control of the TimePix detector."""
cam = Cpt(ASItpxCam, "cam1:")
image = Cpt(ImagePlugin_Timepix, "image1:")
hdf = Cpt(HDF5Plugin_Timepix, "HDF1:")
DETECTOR_SHAPE = (512, 1024) # Shape of the TimePix detector
class Timepix(PSIDeviceBase, TimePixControl):
"""
TimePix class. The IOC is running with the prefix 'X10DA-ES-TPX1:'.
The TimePixFly backend is running on p4-0017.psi.ch. Please check the port from the app
running in headless server mode. The backend_rest url can for instance be 'p4-0017.psi.ch:8452'.
The hostname needs to be set to the name of this machine, e.h. x10da-bec-001.psi.ch.
"""
MIN_DETECTOR_READOUT_TIME = 2.1e-3 # Minimum readout time in seconds for ASI TimePix detector
_DETECTOR_SHAPE = DETECTOR_SHAPE
USER_ACCESS = [
"troin",
"troistep",
"get_pixel_map",
"set_pixel_map",
"set_pixel_map_from_json_file",
"set_enable_xes",
]
xes_data = Cpt(
AsyncSignal,
name="xes_data",
ndim=2,
max_size=1000,
doc="Full XES data, 2D image with energypoints vs time bins.",
)
xes_spectra = Cpt(
AsyncSignal,
name="xes_spectra",
ndim=1,
max_size=1000,
doc="1D spectra, integrated over energy bins.",
)
xes_energy_1 = Cpt(
AsyncSignal,
name="xes_energy_1",
ndim=1,
max_size=1000,
doc="1D time spectra for energy bin 1.",
)
xes_energy_2 = Cpt(
AsyncSignal,
name="xes_energy_2",
ndim=1,
max_size=1000,
doc="1D time spectra for energy bin 2.",
)
tds_period = Cpt(
AsyncSignal,
name="tds_period",
ndim=0,
async_update={"type": "add", "max_shape": [None]},
max_size=1000,
doc="TDS period recorded by the TimePixFly backend detector.",
)
total_periods = Cpt(
AsyncSignal,
name="total_periods",
ndim=0,
async_update={"type": "add", "max_shape": [None]},
max_size=1000,
doc="Total TDS periods recorded by the TimePixFly backend detector.",
)
total_events = Cpt(
AsyncSignal,
name="total_events",
ndim=0,
async_update={"type": "add", "max_shape": [None]},
max_size=1000,
doc="Total events recorded by the TimePixFly backend detector.",
)
preview = Cpt(
PreviewSignal,
name="preview",
ndim=2,
num_rotation_90=1,
doc="Preview signal of the TimePix detector.",
)
static_spectra = Cpt(
AsyncSignal,
name="static_spectra",
ndim=1,
max_size=1000,
acquisition_group="monitored",
async_update={"type": "add", "max_shape": [None, DETECTOR_SHAPE[0]]},
doc="Spectra signal of the TimePix detector.",
)
xes_data_accumulated_1 = Cpt(
AsyncSignal,
name="xes_accumulated_energy_1",
ndim=1,
max_size=1000,
doc="1D time spectra for energy bin 2.",
)
xes_data_accumulated_2 = Cpt(
AsyncSignal,
name="xes_accumulated_energy_2",
ndim=1,
max_size=1000,
doc="1D time spectra for energy bin 2.",
)
file_event = Cpt(
FileEventSignal, name="file_event", doc="File event signal for TimePix detector."
)
def __init__(
self,
*,
name,
prefix: str,
backend_rest_url: str,
hostname: str | None = None,
socket_port: int = 0,
enable_xes: bool = True,
scan_info=None,
device_manager=None,
**kwargs,
):
"""
Initialize the Timepix detector.
Args:
name (str): Name of the device.
prefix (str): EPICS prefix for the device.
backend_rest_url (str): URL of the TimePixFly backend REST API.
hostname (str | None): Hostname of the machine running the backend. Defaults to None
which will use the current machine's hostname.
socket_port (int): Port for the socket connection to the backend. Defaults to 0
which will use the default port from the backend.
enable_xes (bool): Whether to enable XES data acquisition with TimePixFly backend is active. Defaults to True.
scan_info: Scan information object, if available.
device_manager: Device manager instance, if available.
**kwargs: Additional keyword arguments for the base class.
"""
self.backend = TimepixFlyBackend(
backend_rest_url=backend_rest_url, hostname=hostname, socket_port=socket_port
)
self._pixel_map = None
self._troistep = 1
self._troin = 5000
super().__init__(
name=name, prefix=prefix, scan_info=scan_info, device_manager=device_manager, **kwargs
)
self._poll_thread = threading.Thread(
target=self._poll_array_data, daemon=True, name=f"{self.name}_poll_thread"
)
self._poll_thread_kill_event = threading.Event()
# Image poll rate for preview updates in Hz (max 5 Hz to limit throughput)
self._poll_rate = 10
self._enable_xes = enable_xes
self._full_path = ""
self._n_images = 0
self._unique_array_id = 0
self._pv_timeout = 5
self._readout_time = self.MIN_DETECTOR_READOUT_TIME
self.r_lock = threading.RLock() # Lock to access the message buffer safely
self.accumulated_data_e1 = None
self.accumulated_data_e2 = None
def stage(self) -> list[object] | StatusBase: # type: ignore
"""Stage the device.
Super stage not safe to call.."""
self.stopped = False
status = self.on_stage() # pylint: disable=assignment-from-no-return
if isinstance(status, StatusBase):
return status
return []
def _poll_array_data(self):
"""Poll the array data for preview updates."""
while not self._poll_thread_kill_event.wait(1 / self._poll_rate):
try:
# First check if there is a new image
if self.image.unique_id.get() != self._unique_array_id:
self._unique_array_id = self.image.unique_id.get()
else:
continue # No new image, skip update
# Get new image data
value = self.image.array_data.get()
if value is None:
logger.info(f"No image data available for preview of {self.name}")
continue
width = self.image.array_size.width.get()
height = self.image.array_size.height.get()
# Geometry correction for the image
data = np.reshape(value, (height, width))
logger.info(f"Setting preview data for {self.name} with shape {data.shape}")
self.preview.put(data)
except Exception: # pylint: disable=broad-except
content = traceback.format_exc()
logger.error(
f"Error while polling array data for preview of {self.name}: {content}"
)
###
def msg_buffer_callback(
self,
start_frame: dict[
Literal[
"type", "Mode", "TRoiStart", "TRoiStep", "TRoiN", "NumEnergyPoints", "save_interval"
],
Any,
],
data_frames: list[
dict[
Literal["type", "period", "totalEvents", "TDSpectra", "beforeROI", "afterROI"], Any
]
],
end_frame: dict[Literal["type", "error"], Any],
):
"""
Callback method to be attached to the backend to process the message buffer. The callback expects
start_frame, data_frames, and end_frame as arguments. Additionally, one may pass extra kwargs that
will be passed to the callback function.
Args:
start_frame (dict): The StartFrame. Dictionary representation of detailed structure
described in model .timepix_fly_client.timepix_fly_interface.TimepixStartFrame
data_frames (list): List of XesData frames. Dictionary of structures described in
model .timepix_fly_client.timepix_fly_interface.TimepixDataFrame
end_frame (dict): The EndFrame. Dictionary representation of detailed structure
described in model .timepix_fly_client.timepix_fly_interface.TimepixEndFrame
"""
n_energy_points = start_frame.get("NumEnergyPoints", None)
troin = start_frame["TRoiN"]
if troin != self._troin:
logger.error(f"Number of pixels {troin} does not match expected {self._troin}.")
# Create return data
xes_data = np.zeros((n_energy_points, troin), dtype=np.float32) # dtype from backend code
tds_period = []
tds_total_events = 0
total_periods = 0
data_frame_freq = 131000 / start_frame.get("save_interval", 1) # in Hz
logger.info(
f"Processing TimepixFly data: start_frame: {start_frame}, end_frame: {end_frame}"
)
if len(data_frames) == 0:
logger.error(
f"No data frames received in msg_buffer; for start_frame: {start_frame}, end_frame: {end_frame}"
)
# TODO this should no longer happen now as this was fixed in the backend..
else:
for msg in data_frames:
tds_period.append(msg["period"])
tds_total_events += msg["totalEvents"]
for ii in range(n_energy_points):
xes_data[ii, :] += msg["TDSpectra"][ii::n_energy_points]
tds_period = (
np.array(tds_period) / start_frame.get("save_interval", 1) / data_frame_freq
)
total_periods = end_frame.get("periods", None)
if total_periods is not None:
self.total_periods.put(
total_periods / start_frame.get("save_interval", 1) / data_frame_freq
)
else:
logger.error(f"Received total_periods: {total_periods} from end_frame {end_frame}.")
# Transpose to get shape (troin, n_energy_points)
xes_data = xes_data.T
# Put XES data
self.tds_period.put(tds_period)
self.total_events.put(tds_total_events)
self.xes_data.put(
xes_data, async_update={"type": "add", "max_shape": [None, troin, n_energy_points]}
)
if n_energy_points == 8:
data_1 = np.sum(xes_data[:, 0:4], axis=1)
data_2 = np.sum(xes_data[:, 4:8], axis=1)
self.xes_energy_1.put(data_1, async_update={"type": "add", "max_shape": [None, troin]})
self.xes_energy_2.put(data_2, async_update={"type": "add", "max_shape": [None, troin]})
if self.accumulated_data_e1 is None:
self.accumulated_data_e1 = data_1
else:
self.accumulated_data_e1 += data_1
if self.accumulated_data_e2 is None:
self.accumulated_data_e2 = data_2
else:
self.accumulated_data_e2 += data_2
self.xes_data_accumulated_1.put(
self.accumulated_data_e1,
async_update={"type": "replace", "max_shape": [None, troin]},
)
self.xes_data_accumulated_2.put(
self.accumulated_data_e2,
async_update={"type": "replace", "max_shape": [None, troin]},
)
self.xes_spectra.put(
xes_data.sum(axis=1), async_update={"type": "add", "max_shape": [None, troin]}
)
logger.debug(f"Device data set for Timepix with {tds_period}, {tds_total_events}")
### User ACCESS methods
def get_pixel_map(self) -> dict:
"""Get the current pixel map as a dictionary."""
return self._pixel_map.model_dump()
def set_pixel_map(self, pixel_map: dict) -> None:
"""Set the pixel map from a dictionary."""
self._pixel_map = PixelMap.model_validate(pixel_map)
def set_pixel_map_from_json_file(self, file_path: str) -> None:
"""Set the pixel map from a JSON file.
Args:
file_path (str): Path to the JSON file containing the pixel map.
"""
pixel_map = load_pixel_map_from_json(file_path)
self._pixel_map = pixel_map
def set_enable_xes(self, enable: bool) -> None:
"""Enable or disable XES data acquisition.
Args:
enable (bool): Whether to enable XES data acquisition.
"""
self.enable_xes = enable
@property
def enable_xes(self) -> bool:
"""Get whether XES data acquisition is enabled."""
return self._enable_xes
@enable_xes.setter
@typechecked
def enable_xes(self, value: bool):
"""Set whether XES data acquisition is enabled."""
self._enable_xes = value
self._enable_xes_settings(value)
# #TODO Update device manager config if available
# if self.device_manager is not None:
# dev_obj = self.device_manager.devices.get(self.name, None)
# if dev_obj is not None:
# cfg = dev_obj.get_device_config()
# if "enable_xes" in cfg and cfg["enable_xes"] != value:
# cfg["enable_xes"] = value
# dev_obj.set_device_config({"enable_xes": value})
# logger.info(
# f"Updated 'enable_xes' to {value} in device manager for {self.name}"
# )
@property
def pixel_map(self) -> PixelMap:
"""Get the current pixel map of the TimePix detector."""
if self._pixel_map is None:
try:
pixel_map = create_single_energy_per_chip_pixel_map()
self._pixel_map = pixel_map
# pylint: disable=broad-except
# pylint: disable=raise-missing-from
except Exception:
content = traceback.format_exc()
logger.error(f"Failed to load default pixel map: {content}")
raise ValueError(f"Failed to generate default pixel map: {content}")
return self._pixel_map
@pixel_map.setter
@typechecked
def pixel_map(self, value: PixelMap):
self._pixel_map = value
@property
def troistep(self) -> int:
"""Get the current ROI step size."""
return self._troistep
@troistep.setter
@typechecked
def troistep(self, value: int):
"""Set the ROI step size."""
if value <= 0:
raise ValueError("ROI step size must be a positive integer.")
self._troistep = value
@property
def troin(self) -> int:
"""Get the current ROI number of pixels."""
return self._troin
@troin.setter
@typechecked
def troin(self, value: int):
"""Set the ROI number of pixels."""
if value <= 0:
raise ValueError("ROI number of pixels must be a positive integer.")
self._troin = value
######################################################################
### Beamline specific methods for the TimePix Detector integration ###
######################################################################
def on_init(self) -> None:
"""
Called when the device is initialized.
No signals are connected at this point. If you like to
set default values on signals, please use on_connected instead.
"""
start_time = time.time()
logger.info(f"Generating default pixel map for TimePix detector {self.name}...")
self.pixel_map = create_single_energy_per_chip_pixel_map()
logger.info(
f"Default pixel map for TimePix detector {self.name} generated after {time.time() - start_time:.3f} seconds."
)
def _enable_xes_settings(self, enabled: bool) -> None:
"""Enable XES specific settings for the TimePix detector."""
enabled_value = 1 if enabled else 0
self.cam.tdc1_enable.set(enabled_value).wait(timeout=self._pv_timeout)
self.cam.tdc2_enable.set(enabled_value).wait(timeout=self._pv_timeout)
self.cam.raw_enable.set(enabled_value).wait(timeout=self._pv_timeout)
if enabled:
self.cam.tdc1_edge.set(TDCEdge.RISING).wait(timeout=self._pv_timeout)
self.cam.tdc1_output.set(TDCOuput.ALL_CHANNELS).wait(timeout=self._pv_timeout)
self.cam.tdc2_edge.set(TDCEdge.RISING).wait(timeout=self._pv_timeout)
self.cam.tdc2_output.set(TDCOuput.ALL_CHANNELS).wait(timeout=self._pv_timeout)
def on_connected(self) -> None:
"""
Called after the device is connected and its signals are connected.
Default values for signals should be set here.
"""
start_time = time.time()
logger.info(f"On Connected of TimePix detector {self.name}...")
# Prepare TimePix Detector
self._enable_xes_settings(self.enable_xes)
self.cam.trigger_mode.set(TRIGGERMODE.INTERNAL).wait(timeout=self._pv_timeout)
self.cam.trigger_source.set(TRIGGERSOURCE.HDMI1_1).wait(timeout=self._pv_timeout)
self.cam.exposure_mode.set(EXPOSUREMODE.TIMED).wait(timeout=self._pv_timeout)
# Reset array counter on connect
self.cam.array_counter.set(0).wait(timeout=self._pv_timeout)
# ------------------
# Prepare file writing through AD HDF5 plugin
# -----------------
self.hdf.enable.set(1).wait(timeout=self._pv_timeout)
self.hdf.file_write_mode.set(FILEWRITEMODE.STREAM.value).wait(timeout=self._pv_timeout)
self.hdf.auto_save.set(1).wait(timeout=self._pv_timeout)
self.hdf.file_template.set("%s%s").wait(timeout=self._pv_timeout)
self.hdf.lazy_open.set(1).wait(timeout=self._pv_timeout)
self.cam.array_callbacks.set(1).wait(timeout=self._pv_timeout)
# ------------------
# Prepare TimePixFly backend
# -----------------
# Prepare backend for TimePixFly
self.backend.on_connected()
# Register the callback for processing data received by the backend
self.backend.add_callback(self.msg_buffer_callback)
self._poll_thread.start()
logger.info(
f"TimePix detector {self.name} connected and initialized after {time.time() - start_time:.3f} seconds."
)
# Subscribe to new image updates
self.image.unique_id.subscribe(self._on_new_image_received)
def _on_new_image_received(self, value: int, old_value: int, **kwargs):
"""Callback for image unique ID updates to trigger preview update."""
if value == old_value:
return # No new image, or counter reset
try:
# Get new image data
width = self.image.array_size.width.get()
height = self.image.array_size.height.get()
array_data = self.image.array_data.get()
if array_data is None:
logger.info(f"No image data available for preview of {self.name}")
return
# Geometry correction for the image
data = np.sum(np.reshape(array_data, (height, width)), axis=1)
self.static_spectra.put(data)
except Exception: # pylint: disable=broad-except
content = traceback.format_exc()
logger.error(f"Error while updating preview for {self.name} on image update: {content}")
def on_stage(self) -> StatusBase | None:
"""Called while staging the device."""
self.accumulated_data_e1 = None
self.accumulated_data_e2 = None
scan_msg: ScanStatusMessage = self.scan_info.msg # type: ignore
exp_time = scan_msg.scan_parameters.get("exp_time", 0)
if exp_time - self._readout_time <= 0:
raise ValueError(
f"Exposure time {exp_time} must be greater than readout time {self._readout_time}."
)
burst_images = scan_msg.scan_parameters.get("frames_per_trigger", 1)
self._n_images = scan_msg.num_points * burst_images
# Camera has to be set to burst_images, each step will get an individual trigger
self.cam.acquire_time.set(exp_time - self._readout_time).wait(timeout=self._pv_timeout)
self.cam.acquire_period.set(exp_time).wait(timeout=self._pv_timeout)
self.cam.num_images.set(burst_images).wait(timeout=self._pv_timeout)
self.cam.data_source.set(DATASOURCE.IMAGE).wait(timeout=self._pv_timeout)
# Setup file writing
self._full_path = get_full_path(scan_msg, name="timepix")
file_path = "/".join(self._full_path.split("/")[:-1])
file_name = self._full_path.split("/")[-1]
self.cam.array_callbacks.set(1).wait(5) # Enable array callbacks
# self.hdf.enable.set(1).wait(5) # Enable HDF5 plugin
self.hdf.file_path.set(file_path).wait(5)
self.hdf.file_name.set(file_name).wait(5)
# Setup file writing for the total expected number of images
self.hdf.num_capture.set(self._n_images).wait(5)
self.hdf.capture.put(1)
self.file_event.put(
file_path=self._full_path,
done=False,
successful=False,
hinted_h5_entries={"data": "/entry/data/data"},
)
# -------------------------
# XES specific staging
if self.enable_xes:
# Prepare TimePixFly
other_config = OtherConfigModel(
TRoiStep=self.troistep,
TRoiN=self.troin,
output_uri=f"tcp:{self.backend.hostname}:{self.backend.socket_port}",
save_interval=int(131000 / 5) - 5, # Save interval in 131kHz units,
)
logger.debug(f"Current TimePixFly configuration: {other_config}")
pixel_map = self.pixel_map
self.backend.on_stage(other_config=other_config, pixel_map=pixel_map)
# Fetch the backend socket info
net_add = self.backend.timepix_fly_client.get_net_addresses()
logger.debug(f"Using net_add for timepix_fly backend {net_add}")
self.cam.raw_file_template.set("").wait(timeout=self._pv_timeout)
self.cam.raw_file_path.set(f"tcp://connect@{net_add.address}").wait(
timeout=self._pv_timeout
)
def on_unstage(self) -> None:
"""Called while unstaging the device."""
# TODO what should happen for unstage? Make sure that acquisition is not running?
# self.backend.on_unstage()
# self.cam.acquire.put(0)
# status_camera = CompareStatus(self.cam.acquire_busy, ACQUIRESTATUS.DONE)
def on_pre_scan(self) -> StatusBase:
"""Called right before the scan starts on all devices automatically."""
status_camera = CompareStatus(
self.cam.acquire_busy, ACQUIRESTATUS.DONE, timeout=self._pv_timeout
)
status_writer = CompareStatus(
self.hdf.capture, ACQUIRESTATUS.ACQUIRING, timeout=self._pv_timeout
)
status = status_camera & status_writer
self.cancel_on_stop(status)
return status
def on_trigger(self) -> DeviceStatus | StatusBase | None:
"""Called when the device is triggered."""
status_camera = TransitionStatus(
self.cam.acquire_busy, [ACQUIRESTATUS.DONE, ACQUIRESTATUS.ACQUIRING, ACQUIRESTATUS.DONE]
)
img_counter = self.hdf.num_captured.get()
status_backend = None
# First we make sure that the backend reach 'config' state. This needs to happend before each trigger.
if self.enable_xes is True:
status_backend_config = DeviceStatus(self)
self.cancel_on_stop(status_backend_config)
self.backend.timepix_fly_client.add_status_callback(
status=status_backend_config,
success=[TimePixFlyStatus.CONFIG],
error=[TimePixFlyStatus.EXCEPT, TimePixFlyStatus.SHUTDOWN],
)
try:
status_backend_config.wait(timeout=5)
except TimeoutError:
# pylint:disable=raise-missing-from
raise TimeoutError(
f"TimePixFly backend of device {self.name} failed to reach 'config' state in trigger"
)
# Prepare backend to be ready to receive trigger
status = self.backend.on_trigger()
try:
status.wait(timeout=5) # Wait until backend trigger is done
# pylint:disable=raise-missing-from
except Exception:
logger.error(
f"TimePixFly backend of device {self.name} failed to prepare for trigger"
)
raise TimeoutError(
f"TimePixFly backend of device {self.name} failed to reach 'trigger' state in trigger"
)
# Status that resolves once the trigger is done
status_backend = self.backend.on_trigger_finished()
if status_backend is not None:
return_status = status_camera & status_backend
else:
return_status = status_camera
self.cancel_on_stop(return_status)
self.cam.acquire.put(1)
return return_status
def on_complete(self) -> DeviceStatus | StatusBase | None:
"""Called to inquire if a device has completed a scans."""
# Status Camera
status_camera = CompareStatus(self.cam.acquire_busy, ACQUIRESTATUS.DONE)
# Status Writer
st1 = CompareStatus(self.hdf.capture, ACQUIRESTATUS.DONE)
st2 = CompareStatus(self.hdf.write_file, ACQUIRESTATUS.DONE)
status_written_images = CompareStatus(self.hdf.num_captured, self._n_images)
status_writer = st1 & st2 & status_written_images
# Status Backend
status_backend = None
if self.enable_xes is True:
# Add callback to the backend complete handling
status_backend = self.backend.on_complete(status=status_backend)
# Combine the statuses
if status_backend is not None:
return_status = status_backend & status_camera & status_writer
else:
return_status = status_camera & status_writer
return_status.add_callback(self._complete_callback)
self.cancel_on_stop(return_status)
return return_status
def _complete_callback(self, status: CompareStatus) -> None:
"""Callback for when the device completes a scan."""
if status.success:
self.file_event.put(
file_path=self._full_path, # pylint: disable:protected-access
done=True,
successful=True,
hinted_h5_entries={"data": "/entry/data/data"},
)
else:
self.file_event.put(
file_path=self._full_path, # pylint: disable:protected-access
done=True,
successful=False,
hinted_h5_entries={"data": "/entry/data/data"},
)
def on_stop(self) -> None:
"""Called when the device is stopped."""
# Camera
self.cam.acquire.put(0)
self.hdf.capture.put(0)
# Backend
if self.enable_xes is True:
self.backend.on_stop()
def on_destroy(self):
"""Cleanup method to stop the device and clean up resources."""
self.cam.acquire.put(0)
self.hdf.capture.put(0)
self._poll_thread_kill_event.set()
self.backend.on_stop()
self.backend.on_destroy()
# pylint: disable=protected-access
if __name__ == "__main__": # pragma: no cover
timepix = Timepix(
name="timepix",
prefix="X10DA-ES-TPX1:",
backend_rest_url="P6-0008.psi.ch:8452", # "P4-0017.psi.ch:8452",
hostname="x10da-bec-001.psi.ch",
)
try:
# timepix.wait_for_connection(all_signals=True, timeout=10)
timepix.on_connected()
print("Timepix connected and initialized.")
for exp_time, frames_per_trigger, runs in zip([0.1, 1, 0.2], [20, 5, 1], [10, 5, 30]):
print(
f"Sleeping for 0.5 seconds before starting the scan with exp_time={exp_time} "
f"and frames_per_trigger={frames_per_trigger}. and runs {runs}"
)
time.sleep(0.5)
timepix.scan_info.msg.scan_parameters.update(
{
"exp_time": exp_time, # Set exposure time to 1 second for testing
"frames_per_trigger": frames_per_trigger, # Set frames per trigger to 5 for testing
}
)
timepix.stage()
logger.warning(f"Timepix on stage done")
timepix.pre_scan()
logger.warning(f"Timepix on pre_scan done")
msgs = []
# for ii in range(runs):
for run in range(runs):
logger.warning(f"Starting trigger run {run + 1}/{runs}")
status = timepix.trigger()
logger.warning(f"Timepix triggered")
start_time = time.time()
while not status.done:
try:
status.wait(timeout=1)
except Exception as exc:
logger.warning(f"Trigger not done after ({time.time() - start_time:.2f}s)")
if time.time() - start_time > 20:
logger.warning("Breaking loop manually after 20 seconds of waiting.")
status.set_exception(f"Failed to complete trigger after 20 seconds")
break
if hasattr(timepix, "_msg_dump"):
n_messages = len(timepix._msg_dump)
logger.warning(f"Messages in Buffer is {n_messages}")
if n_messages > 0:
msg = timepix._msg_dump[-1]
logger.warning(
f"Last message had N start_frame : {msg.get('start_frame')}, N data_frames: {len(msg.get('data_frame'))}, N end_frame : {msg.get('end_frame')}"
)
status = timepix.complete()
print("Waiting for timepix to complete.")
status.wait(timeout=10)
print("Timepix scan completed.")
timepix.unstage()
# timepix._msg_dump.clear()
print("Timepix unstaged.")
except Exception as e:
content = traceback.format_exc()
logger.error(f"An error occurred: {content}")
finally:
timepix.destroy()
print("Timepix destroyed.")
@@ -0,0 +1 @@
from .timepix_fly_backend import TimepixFlyBackend
@@ -0,0 +1 @@
from .timepix_fly_mock_server import TimePixFlyMockServer
@@ -0,0 +1,49 @@
"""Module to control the Timepix Fly mock server."""
import requests
class TimePixFlyMockServer:
"""
A mock server for the Timepix Fly detector that simulates the behavior of the actual server.
This is used for testing purposes and does not require a real Timepix Fly detector.
"""
def __init__(self, host: str = "localhost", port: int = 8080, logger=None):
"""
Initialize the TimePixFlyMockServer with a host and port.
Args:
host (str): The host address for the mock server. Default is "localhost".
port (int): The port number for the mock server. Default is 8080.
logger: An optional logger to log messages. If not provided, messages will be printed to the console.
"""
self.host = host
self.port = port
self.logger = logger
def add_log(self, message: str) -> None:
"""
Add a log message to the logger if available.
If no logger is provided, it will print the message to the console.
Args:
message (str): The message to log.
"""
if self.logger is not None:
self.logger.info(message)
else:
print(message)
def start_acquisition(self):
"""
Simulate starting an acquisition on the Timepix Fly detector.
This method does not perform any real acquisition but simulates the behavior.
"""
try:
requests.get(f"http://{self.host}:{self.port}/measurement/start", timeout=0.2)
except requests.exceptions.RequestException:
self.add_log("Failed to start acquisition on Timepix Fly mock server.")
# Ignore all exceptions as there is currently no return value for the request
else:
self.add_log("Acquisition started on Timepix Fly mock server.")
@@ -0,0 +1,559 @@
"""
Implementation of the Timepix Fly Backend. It handles the communication
with the TimepixFly backend (https://github.com/paulscherrerinstitute/TimePixFly).
Please be aware that this was developed agains the 'dev' branch (2025/08/15).
It communicates with the backend through a simple Client (TimepixFlyClient)
that handles the REST and WebSocket communication + callbacks, and provides
hooks for all the relevant ophyd interface, 'on_stage',
'on_trigger', 'on_complete', 'on_stop', ...
"""
from __future__ import annotations
import json
import signal
import socket
import threading
import time
import traceback
import uuid
from typing import TYPE_CHECKING, Callable, Tuple
from bec_lib.logger import bec_logger
from ophyd_devices import StatusBase
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_client import (
TimepixFlyClient,
TimePixFlyStatus,
)
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_interface import (
OtherConfigModel,
PixelMap,
)
if TYPE_CHECKING:
from ophyd import DeviceStatus
from superxas_bec.devices.timepix.timepix import Timepix
logger = bec_logger.logger
# pylint: disable=line-too-long
# pylint: disable=redefined-outer-name
class TimepixFlyBackendException(Exception):
"""Custom exception for Timepix Fly Backend errors."""
class TimepixFlyBackend:
"""Timepix Fly Backend Device."""
def __init__(self, backend_rest_url: str, hostname: str | None = None, socket_port: int = 0):
"""
Initialize the Timepix Fly Backend device.
Parameters:
backend_rest_url: The REST URL of the backend.
hostname: The hostname of the device, defaults to None, which means
socket.getfqdn() will be used to fetch hostname. It is recommended to specify
the hostname explicitly with domain name, e.g. 'x10da-bec-001.psi.ch' for use
at the beamline computers of SLS, or localhost for local testing of the backend.
socket_port: The socket port to use. Defaults to 0,
which lets the OS choose an available port.
"""
ws_url = f"{backend_rest_url}/ws"
self.timepix_fly_client = TimepixFlyClient(rest_url=backend_rest_url, ws_url=ws_url)
if hostname is None:
hostname = socket.getfqdn()
self.hostname = hostname
self.socket_port = socket_port # Use 0 as default to let the OS choose an available port
self.__msg_buffer = []
self.callbacks: dict[str, Tuple[Callable[[dict, list[dict], dict, dict], None], dict]] = {}
self._status_objects: list[StatusBase] = []
self._decoder = json.JSONDecoder()
self._socket_server: socket.socket | None = None
self._data_thread: threading.Thread | None = None
self._data_thread_shutdown_event = threading.Event()
###################################################
###### Hooks for the PSIDeviceBase interface ######
###################################################
def on_connected(self):
"""Called if it is ensured that the device is connected."""
time_started = time.time()
logger.info("Connecting to Timepix Fly backend...")
try:
self.timepix_fly_client.on_connected()
status = self.start_data_server()
status.wait(timeout=5)
except Exception: # pylint: disable=broad-except
content = traceback.format_exc()
logger.error(f"Error starting data server: {content}")
# pylint: disable=raise-missing-from
raise TimepixFlyBackendException(
f"Could not start data server on {self.hostname}:{self.socket_port}. Please check logs for more details."
)
logger.info(
f"Timepix Fly backend connected and data server started on {self.hostname}:{self.socket_port} after {time.time() - time_started:.3f} seconds."
)
def on_stage(self, other_config: OtherConfigModel, pixel_map: PixelMap):
"""
Hook for on stage logic.
Args:
other_config (OtherConfigModel): The configuration for the Timepix Fly detector.
pixel_map (PixelMap): The pixel map for the Timepix Fly detector.
"""
time_started = time.time()
status = StatusBase()
self.cancel_on_stop(status)
self.timepix_fly_client.add_status_callback(
status,
success=[TimePixFlyStatus.CONFIG],
error=[TimePixFlyStatus.EXCEPT, TimePixFlyStatus.SHUTDOWN],
)
try:
status.wait(timeout=5.0)
except Exception: # pylint: disable=broad-except
content = traceback.format_exc()
logger.error(
f"Error while waiting for Timepix Fly backend to be in config state: {content}"
)
# pylint: disable=raise-missing-from
raise TimeoutError(
f"Timepix Fly backend state did not reach config state, running into timeout. Error traceback {content}."
)
status = StatusBase()
self.timepix_fly_client.add_status_callback(
status,
success=[TimePixFlyStatus.CONFIG],
error=[TimePixFlyStatus.EXCEPT, TimePixFlyStatus.SHUTDOWN],
)
logger.debug(f"Setting other config, backend {other_config}")
self.timepix_fly_client.set_other_config(other_config)
self.timepix_fly_client.set_pixel_map(pixel_map)
try: # TODO make asynchronous
status.wait(timeout=5.0)
except Exception: # pylint: disable=broad-except
content = traceback.format_exc()
logger.error(
f"Error while waiting for Timepix Fly backend to be in config state after setting config: {content}"
)
# pylint: disable=raise-missing-from
raise TimeoutError(
f"Timepix Fly backend state did not reach config state after setting config, running into timeout. Error traceback {content}."
)
logger.info(f"TimePixFly backend staged after {time.time() - time_started:.3f} seconds.")
def on_trigger(
self, status: StatusBase | DeviceStatus | None = None
) -> StatusBase | DeviceStatus:
"""
Hook for on_trigger logic. It adds a status callback based on the TimePixFlyStatus.
The backend needs to get into the AWAIT_CONNECTION state before starting the acquisition.
Args:
status (StatusBase | DeviceStatus | None): The status object to track the operation.
If None, a new StatusBase object will be created.
Returns:
StatusBase | DeviceStatus: The status object that will be updated with the operation's result
"""
# TODO, could be removed as it's checkd from the top level!
if status is None:
status = StatusBase()
self.cancel_on_stop(status)
self.timepix_fly_client.add_status_callback(
status,
success=[TimePixFlyStatus.AWAIT_CONNECTION],
error=[TimePixFlyStatus.EXCEPT, TimePixFlyStatus.SHUTDOWN],
)
self.timepix_fly_client.start()
return status
def on_trigger_finished(
self, status: StatusBase | DeviceStatus | None = None
) -> StatusBase | DeviceStatus:
"""
Hook for on_trigger_finished logic. It adds a status callback based on the TimePixFlyStatus.
The backend needs to get into the CONFIG state again after a trigger is finished.
In practice, a full scan logic is happening during on trigger.
The status will be marked as finished/successful when the backend state
reaches CONFIG. If an exception state is reached, the status will be marked as failed.
Args:
status (StatusBase | DeviceStatus | None): The status object to track the operation.
If None, a new StatusBase object will be created.
Returns:
StatusBase | DeviceStatus: The status object that will be updated with the operation's result
"""
if status is None:
status = StatusBase()
self.cancel_on_stop(status)
self.timepix_fly_client.add_status_callback(
status,
success=[TimePixFlyStatus.CONFIG],
error=[TimePixFlyStatus.EXCEPT, TimePixFlyStatus.SHUTDOWN],
)
return status
def on_complete(
self, status: StatusBase | DeviceStatus | None = None
) -> StatusBase | DeviceStatus:
"""
Hook for on_complete logic. It adds a status callback based on the TimePixFlyStatus.
The backend needs to get into the CONFIG state after a single acquisition.
Args:
status (StatusBase | DeviceStatus | None): The status object to track the operation.
If None, a new StatusBase object will be created.
Returns:
StatusBase | DeviceStatus: The status object that will be updated with the operation's result
"""
if status is None:
status = StatusBase()
self.cancel_on_stop(status)
self.timepix_fly_client.add_status_callback(
status,
success=[TimePixFlyStatus.CONFIG],
error=[TimePixFlyStatus.EXCEPT, TimePixFlyStatus.SHUTDOWN],
)
return status
def on_unstage(self) -> StatusBase:
"""Hook for on_unstage logic."""
# status = StatusBase()
# self.cancel_on_stop(status)
# self.timepix_fly_client.add_status_callback(
# status,
# success=[TimePixFlyStatus.CONFIG],
# error=[TimePixFlyStatus.EXCEPT, TimePixFlyStatus.SHUTDOWN],
# )
# return status
def on_destroy(self):
"""Hook for on_destroy logic."""
self.timepix_fly_client.shutdown()
self._data_thread_shutdown_event.set()
if self._data_thread is not None and self._data_thread.is_alive():
self._data_thread.join(timeout=1) # Allow the data thread to finish
if self._data_thread.is_alive():
logger.error(
"Data thread poll loop of timepix_fly_backend did not stop within 1 second."
)
if self._socket_server is not None:
try:
logger.info(f"Closing socket server on {self.hostname}:{self.socket_port}.")
self._socket_server.close()
# pylint: disable=broad-except
except Exception:
content = traceback.format_exc()
logger.error(f"Error closing socket server: {content}")
def on_stop(self):
"""Hook for on_stop logic."""
self.stop_all_status_objects()
self.timepix_fly_client.stop_running_collection()
####################################################
########## Custom Methods for the Backend ##########
####################################################
def cancel_on_stop(self, status: StatusBase):
"""Cancel ongoing operations of a status object when the sto method is called."""
self._status_objects.append(status)
def stop_all_status_objects(self):
"""Stop all status objects that are currently running."""
for status in self._status_objects:
with status._lock:
if not status.done:
status.set_exception(
RuntimeError("Stop called on device, all status objects cancelled.")
)
logger.info(f"Cancelled status object: {status}")
self._status_objects.clear()
def add_callback(self, callback: callable, kwd: dict | None = None) -> str:
"""
Add a callback that will be executed whenever an acquisition is completed. This is
determind by receiving an EndFrame message from the backend. There will always be
a StartFrame message, follow by optional DataFrame messages, and finally
an EndFrame message. The callback will be called with the StartFrame, all DataFrames
and the EndFrame message as arguments, along with any additional keyword arguments
provided when registering the callback.
The callback signature needs to be:
def callback(start_frame: dict, data_frames: list[dict], end_frame: dict, kwd) -> None:
Args:
- start_frame (dict): The first message received, typically containing metadata.
- data_frames (list[dict]): A list of all data frames received during the acquisition.
- end_frame (dict): The last message received, typically containing the EndFrame type.
- any additional keyword arguments provided when registering the callback.
Args:
callback (callable): The callback function to be called. The callback signature should be:
kwd (dict | None): Additional keyword arguments to pass to the callback, they will be unpacked
when calling the callback. If None, an empty dictionary will be used.
Returns:
str: A unique identifier for the callback.
"""
if kwd is None:
kwd = {}
cb_id = uuid.uuid4()
self.callbacks[cb_id] = (callback, kwd)
logger.info(f"Callback {callback.__name__} added with UUID {cb_id}.")
return str(cb_id)
def remove_callback(self, cb_id: str):
"""
Remove a callback by its unique identifier.
Args:
cb_id (str): The unique identifier of the callback to remove.
"""
if cb_id in self.callbacks:
self.callbacks.pop(cb_id)
logger.info(f"Callback with UUID {cb_id} removed.")
else:
logger.warning(f"Callback with UUID {cb_id} not found.")
def start_data_server(self) -> StatusBase:
"""
Start the data server to receive data from the Timepix Fly backend over a socket connection.
It will try to decypher the hostname through socket.getaddrinfo, and if multiple addresses
are found, it will use the first one. Please note that depending on the network configuration,
the hostname might not have the correct domain name attached, so it is recommended to specify
the hostname explicitly with domain name, e.g. 'x10da-bec-001.psi.ch'.
The method creates a socket server that listens for incoming connections on the specified
hostname and port. It starts a thread that continuously receives data from the socket,
decodes the received JSON data, and processes it. The data is expected to be in JSON format,
with each message ending with a trailing byte "}\n".
Returns:
StatusBase: A status object that indicates if the data server thread is ready to accept
connections. High level implementation should ensure that the data server is
started (status.wait(timeout=4)) before any data is sent from the backend.
"""
info = socket.getaddrinfo(
self.hostname, port=self.socket_port, family=socket.AF_INET, type=socket.SOCK_STREAM
)
if len(info) == 0:
raise RuntimeError(f"Could not resolve hostname {self.hostname} for socket server.")
if len(info) > 1:
logger.info(
f"Multiple addresses found for {self.hostname}. Using the first one: {info[0]}"
)
family, socktype, proto, _, sockaddr = info[0]
self._socket_server = socket.create_server(sockaddr, family=family, backlog=1)
self._socket_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# Set the hostname and socket_port to the ones that was picked by the socket.getaddrinfo
self.hostname, self.socket_port = self._socket_server.getsockname()
logger.info(
f"Socket server started on {self.hostname}:{self.socket_port}. Waiting for connections."
)
# Create status object to return for the high level implementations
status = StatusBase()
if self._data_thread is None or not self._data_thread.is_alive():
self._data_thread_shutdown_event.clear()
self._data_thread = threading.Thread(
target=self._receive_data_on_socket, kwargs={"status": status}
)
self._data_thread.start()
else:
raise TimepixFlyBackendException(
"Data server thread is already running on timepix_fly_backend."
)
return status
def _receive_data_on_socket(self, status: StatusBase):
"""
Background loop running in a thread, that receives data from the
timepix fly backend over socket_server. The backend reconnects for every acquisition (trigger),
to this socket. Therefore, it is important to handle all connections and disconnections properly.
The buffer variable stores a string stream of received data. Whenever a trailing byte "}\n" is found,
in the buffer, the buffer is split into chunks of received data and each chunk is decoded
as a JSON object. The decoded objects are then processed, and if an EndFrame message is received,
the registered callbacks are executed with the StartFrame, all DataFrames, and the EndFrame message.
"""
buffer = ""
self._socket_server.settimeout(
0.1
) # Set short socket timeout to avoid blocking the thread loop
status.set_finished() # Indicate that the socket server is ready to accept connections
while not self._data_thread_shutdown_event.is_set(): # Shutdown event
try:
# blocks until connected or timeout reached
conn, addr = self._socket_server.accept()
except socket.timeout:
continue # Timeout is okay, continue
except Exception: # pylint: disable=broad-except
# Log error, check if shutdown event is set.
# Shutdown event should be set before socket_server.close() is called.
content = traceback.format_exc()
logger.error(f"Error accepting connection: {content}")
continue
logger.debug(f"Connection accepted from {addr} for timepix_fly backend.")
# Clear the message buffer before entering the loop.
if self.__msg_buffer:
logger.warning(f"Found messages in msg_buffer: {self.__msg_buffer}")
self.__msg_buffer.clear()
conn.settimeout(0.1) # Set timeout for connection to avoid blocking in recv
with conn:
while not self._data_thread_shutdown_event.is_set():
try:
# What if we split the chunk
chunk = conn.recv(4096) # Adjust buffer size as needed
except socket.timeout:
# Timeout is okay, continue in loop
continue
except Exception as e: # pylint: disable=broad-except
logger.error(f"Connection error: {e}. Closing connection.")
# conn = None #TODO should we reset conn?
break
if not chunk:
# Receiving an empty chunk means the connection was closed
# conn = None #TODO should we reset conn?
break
buffer += chunk.decode("utf-8")
# Check if trailing byte "}\n" present in buffer
buffer_chunks = buffer.split("}\n")
for entry in buffer_chunks[:-1]:
# Process all complete JSON objects in the buffer
self._decode_received_data(entry + "}")
# Keep the last incomplete chunk.
# If the buffer ended with "}\n", this will be an empty string.
buffer = buffer_chunks[-1]
def _decode_received_data(self, buffer: str) -> None:
"""
Decode the received data from the socket.
Args:
buffer (str): The JSON string received from the socket.
"""
try:
obj, _ = self._decoder.raw_decode(buffer)
except json.JSONDecodeError:
logger.error(f"TimePixFlyBackend: Failed to decode JSON from buffer: {buffer}")
return # TODO should this raise, or only log error as of now?
self.__msg_buffer.append(obj)
if obj.get("type", "") == "EndFrame":
try:
# If the EndFrame message is received, run the callbacks
logger.debug(f"Running callbacks")
self.run_msg_callbacks()
except Exception: # pylint: disable=broad-except
content = traceback.format_exc()
logger.error(f"Error in msg callbacks with error msg: {content}")
msgs_in_buffer = "".join(
[f"{msg['type']} with keys {msg.keys()} \n" for msg in self.__msg_buffer]
)
logger.debug(f"TimePixFlyBackend: Messages in buffer: {msgs_in_buffer}")
finally: # Make sure to always reset the message buffer after processing
logger.debug(
"TimePixFlyBackend: Resetting message buffer after processing EndFrame message."
)
logger.debug(f"Messages in buffer: {len(self.__msg_buffer)}")
self.__msg_buffer.clear()
def run_msg_callbacks(self):
"""Run callbacks if EndFrame message is received."""
start_frame = self.__msg_buffer[0]
end_frame = self.__msg_buffer[-1]
data_frames = self.__msg_buffer[1:-1]
for cb, kwd in self.callbacks.values():
try:
cb(start_frame, data_frames, end_frame, **kwd)
except Exception: # pylint: disable=broad-except
content = traceback.format_exc()
logger.error(f"Error in callback with error msg: {content}")
if __name__ == "__main__": # pragma: no cover
import time
from superxas_bec.devices.timepix.timepix_fly_client.test_utils.timepix_fly_mock_server import (
TimePixFlyMockServer,
)
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_interface import (
TimepixEndFrame,
TimepixStartFrame,
TimepixXESFrame,
)
mock_server = TimePixFlyMockServer()
timepix = TimepixFlyBackend(
backend_rest_url="localhost:8452", hostname="localhost", socket_port=3031
)
start_frames = {}
xes_frames = {}
end_frames = {}
def add_msg_callback(start_frame, data_frames, end_frame, **kwargs):
"""Callback to print received messages."""
counter = len(start_frames)
start_frames[counter] = TimepixStartFrame(**start_frame)
xes_frames[counter] = [TimepixXESFrame(**data_frame) for data_frame in data_frames]
end_frames[counter] = TimepixEndFrame(**end_frame)
try:
print("TimepixFlyBackend initialized.")
timepix.on_connected()
print("TimepixFlyBackend connected.")
# Parse scan info for OtherConfig
config = OtherConfigModel(
output_uri=f"tcp:{timepix.hostname}:{timepix.socket_port}", TRoiStep=1, TRoiN=5000
)
# Parse pixel map from scan info if needed, otherwise use some default pixel map.
pixel_map = PixelMap(
chips=[
[{"i": 256 ^ 2 - 1, "p": [0, 1], "f": [0.5, 0.5]}],
[{"i": 255 * 256, "p": [0, 1], "f": [0.5, 0.5]}],
[{"i": 255, "p": [1, 2], "f": [0.5, 0.5]}],
[{"i": 0, "p": [1, 2], "f": [0.5, 0.5]}],
]
)
timepix.add_callback(add_msg_callback)
timepix.on_stage(other_config=config, pixel_map=pixel_map)
print("TimepixFlyBackend staged with configuration and pixel map.")
for ii in range(5):
print(f"Starting scan {ii + 1}...;")
time.sleep(1)
status_1 = timepix.on_trigger()
# print("TimepixFlyBackend pre-scan started.")
status_1.wait(timeout=10)
mock_server.start_acquisition()
status_2 = timepix.on_trigger_finished()
status_2.wait(timeout=10)
# print("Acquisition started on mock server.")
print("TimepixFlyBackend scan completed.")
status = timepix.on_complete()
status.wait(timeout=10)
print(
f"Received {len(start_frames)} start frames, {len(xes_frames)} data frames, and {len(end_frames)} end frames."
)
# pylint: disable=broad-except
except Exception as e:
logger.error(f"Error during TimepixFlyBackend operation: {e}")
finally:
timepix.on_destroy()
print("TimepixFlyBackend destroyed.")
@@ -0,0 +1,497 @@
"""
Module that implements a python client interface to the TimePix Fly tpx3app REST API,
and connects to the TimePix Fly WebSocket server to receive status updates.
It provides methods to start, stop, and configure the TimePix detector,
as well as to retrieve pixel maps, other configuration parameters, and
the current state of the detector.
"""
from __future__ import annotations
import enum
import threading
import time
import traceback
from typing import Any, Type
import requests
from bec_lib.logger import bec_logger
from ophyd import StatusBase
from websockets import State
from websockets.exceptions import WebSocketException
from websockets.sync.client import ClientConnection, connect
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_interface import (
LastError,
NetAddresses,
OtherConfigModel,
PixelMap,
PixelMapFromFile,
ProgramState,
TimePixResponse,
Version,
)
logger = bec_logger.logger
# pylint: disable=line-too-long
class TimePixStatusError(Exception):
"""Exception raised when the TimePix detector status was in an unexpected state."""
class TimePixFlyStatus(str, enum.Enum):
"""
Enum representing the status of the TimePix detector.
"""
INIT = "init"
CONFIG = "config"
SETUP = "setup"
COLLECT = "collect"
SHUTDOWN = "shutdown"
UNDEFINED = "undefined"
AWAIT_CONNECTION = "await_connection"
EXCEPT = "except"
class TimepixFlyClient:
"""
A client for the TimePix fly backend (tpx3app).
It exposes methods to interact with REST endpoints
and allows to connect callbacks to status objects from ophyd
that allow to dynamically update based on the state of the backend.
"""
def __init__(self, rest_url: str, ws_url: str):
"""
Initialize the TimePixFlyClient with a server address.
Args:
rest_url (str): The REST API URL for the TimePix Fly backend, e.g., "localhost:8452".
ws_url (str): The WebSocket URL for the TimePix Fly backend, e.g., "localhost:8452/ws".
"""
self.rest_url = rest_url
self.ws_url = ws_url
self.ws_client: ClientConnection | None = None
self._rlock = threading.RLock()
self._timeout = 5 # Default timeout for requests
self._status: TimePixFlyStatus = TimePixFlyStatus.UNDEFINED
self._ws_update_thread: threading.Thread | None = None
self._shutdown_event = threading.Event()
self._status_callbacks: dict[
str, tuple[StatusBase, list[TimePixFlyStatus], list[TimePixFlyStatus]]
] = {}
self._started: bool = False # Flag to indicate if the client has started sending data
#############################
### Utility Methods ###
#############################
def on_connected(self) -> None:
"""
Called when the client is connected to the TimePix server.
This method can be overridden to perform actions when the client connects.
"""
try:
self.stop_running_collection()
self.connect()
self.wait_for_connection(timeout=5)
except Exception:
content = traceback.format_exc()
logger.error(
f"Error while checking the state of the TimePix server: {content}. "
f"Please check the server address and ensure the server is running."
)
# pylint: disable=raise-missing-from
raise ConnectionError(
f"TimePix Fly client failed to connect to {self.rest_url}. Please check logs for detailed error."
)
def stop_running_collection(self):
"""
Resets the TimePix backend to the configuration state. We check if the backend
is in the CONFIG state, and if it is not, we stop the current collection.
We check in addition that the client has not been started before via start()
REST API call. stop_collect() will reset the flag _started to False.
"""
state = self.state()
if state.state != TimePixFlyStatus.CONFIG or self._started is True:
logger.info(
f"Stopping running collection on TimePix backend, current state: {state.state}, was started: {self._started}"
)
self.stop_collect()
##############################
### WebSocket Methods ###
### Status Update Handling ###
##############################
@property
def status(self) -> TimePixFlyStatus:
"""
Get the current status of the TimePix detector.
Returns:
TimePixFlyStatus: The current status of the TimePix detector.
"""
return self._status
def add_status_callback(
self,
status: StatusBase,
success: list[TimePixFlyStatus],
error: list[TimePixFlyStatus],
run: bool = True,
):
"""
Add a StatusBase callback for the TimePix detector. The status will be updated when the detector status
changes and set to finished when the status matches one of the specified success statuses and to exception
when the status matches one of the specified error statuses.
Per default, the callback will immediately check and run if the status is already in success.
Args:
status (StatusBase): StatusBase object
success (list[StdDaqStatus]): list of statuses that indicate success
error (list[StdDaqStatus]): list of statuses that indicate error
run (bool): If True, the callback will be run immediately if the status is already in success.
If False, the callback will not be run immediately.
"""
if run is True:
try:
if self.status in success:
status.set_finished()
return
if self.status in error:
last_error = self.last_error()
raise TimePixStatusError(
f"Current state {self.status} of TimePixFly Backend is in list of error states: {error}. Last error: {last_error.message}"
)
except Exception as e:
logger.error(f"Error while adding status callback: {e}")
if status.done is False:
status.set_exception(e)
self._status_callbacks[id(status)] = (status, success, error)
def connect(self):
"""Connect to the TimePix WebSocket server."""
if self._ws_update_thread is not None and self._ws_update_thread.is_alive():
return
self._ws_update_thread = threading.Thread(target=self._ws_update_loop, daemon=True)
self._ws_update_thread.start()
# pylint: disable=raise-missing-from
def wait_for_connection(self, timeout: float = 6) -> None:
"""
Wait for the connection to the TimepixFly client connection to be established.
Args:
timeout (float): timeout for the request
"""
logger.info(
f"Attempting to connect to TimePixFly WebSocket at {self.ws_url}, with timeout {timeout} seconds."
)
with self._rlock:
start_time = time.time()
while True:
if self.ws_client is not None and self.ws_client.state == State.OPEN:
return
try:
self.ws_client = connect(f"ws://{self.ws_url}")
break
except ConnectionRefusedError:
if time.time() - start_time > timeout:
content = traceback.format_exc()
logger.error(f"Connection timed out: {content}")
raise TimeoutError(
f"Timeout while waiting for connection to TimePixFly WebSocket server on {self.ws_url}"
)
except Exception:
content = traceback.format_exc()
logger.error(
f"Failed to connect to TimePixFly WebSocket server on {self.ws_url}: {content}"
)
raise ConnectionError(
f"Failed to connect to TimePixFly WebSocket server on {self.ws_url} with error: {content}"
)
time.sleep(0.5) # Try to reconnect every 0.5 seconds
def _ws_update_loop(self):
"""Websocket update loop, runs in background thread."""
while not self._shutdown_event.is_set():
self._ws_send_and_receive()
def _ws_send_and_receive(self):
"""Receive messages from the TimePixFly WebSocket server."""
if not self.ws_client:
self.wait_for_connection()
try:
try:
recv_msgs = self.ws_client.recv(timeout=0.1)
except TimeoutError:
return
logger.trace(f"Received from timepixfly ws: {recv_msgs}")
if recv_msgs is not None:
self._on_received_ws_message(recv_msgs)
except WebSocketException:
content = traceback.format_exc()
logger.warning(f"Websocket connection closed unexpectedly: {content}")
self.wait_for_connection()
def _on_received_ws_message(self, msg: str):
"""
Handle a message received from the StdDAQ.
"""
try:
self._status = TimePixFlyStatus(msg)
logger.info(f"Received TimepixFly status: {self._status.value}")
except Exception: # pylint: disable=broad-except
content = traceback.format_exc()
logger.error(f"Failed to decode websocket message: {content}")
return
self._run_status_callbacks()
def _run_status_callbacks(self):
"""
Update the StatusBase objects based on the current status of the StdDAQ.
If the status matches one of the success or error statuses, the StatusBase object will be set to finished
or exception, respectively and removed from the list of callbacks.
"""
status = self._status
logger.warning(f"Running status callbacks for status: {status.value}")
callback_ids = list(self._status_callbacks.keys())
for cb_id in callback_ids:
dev_status, success, error = self._status_callbacks[cb_id]
with dev_status._lock:
if dev_status.done:
self._status_callbacks.pop(cb_id)
continue
if status in success:
dev_status.set_finished()
logger.debug(f"Status callback finished in succes: {status.value}")
self._status_callbacks.pop(cb_id)
elif status in error:
try:
last_error = self.last_error()
raise TimePixStatusError(
f"TimePixFly Backend state '{status.value}' is in list of specified errors {error}. Last error message: {last_error.message}"
)
except Exception as e:
logger.error(f"Error in status callback from TimepixFly Backend: {e}")
dev_status.set_exception(e)
self._status_callbacks.pop(cb_id)
# Reset the _started flag if the status is in CONFIG.
if status == TimePixFlyStatus.CONFIG:
self._started = False # Should this be made thread-safe?
def shutdown(self):
"""Shutdown the TimepixFlyClient client."""
self._shutdown_event.set()
if self.ws_client is not None:
self.ws_client.close()
self.ws_client = None
############################
##### REST API Methods #####
############################
def _get(
self, get_cmd: str, get_response_model: Type[TimePixResponse] | None = None
) -> Type[TimePixResponse] | None:
"""
Send a GET request to the TimePix server.
Args:
get_cmd (str): The command to send in the GET request.
get_response_model (Type[TimePixResponse]): The Pydantic model to parse the response.
Returns:
Any: The parsed response if a model is provided, else the raw response.
"""
logger.debug(f"Sending GET request to TimePix server: {get_cmd}")
response = requests.get(f"http://{self.rest_url}/{get_cmd}", timeout=self._timeout)
response.raise_for_status() # Raise an error for bad responses
if get_response_model is not None:
try:
return get_response_model(**response.json())
except Exception as e:
logger.info(f"Error parsing response for {get_cmd}: Response: {response.text}")
raise e
else:
return response.text
def _put(
self, put_cmd: str, value: dict[str, Any], put_response_model: Type[TimePixResponse]
) -> Type[TimePixResponse] | None:
"""
Send a PUT request to the TimePix server.
Args:
put_cmd (str): The command to send in the PUT request.
value (dict[str, Any]): The value to send in the PUT request.
put_response_model (Type[TimePixResponse]): The Pydantic model to parse the response.
Returns:
Any: The parsed response if a model is provided, else None.
"""
logger.debug(f"Sending PUT request to TimePix server: {put_cmd} with value: {value}")
response = requests.put(
f"http://{self.rest_url}/{put_cmd}", json=value, timeout=self._timeout
)
response.raise_for_status()
if put_response_model is not None:
return put_response_model(**response.json())
def start(self) -> None:
"""
Start the TimePix detector by sending a GET request to the start endpoint.
This method is a wrapper around the REST API call to start the detector.
"""
logger.debug(f"Start called from client")
self._get(get_cmd="?start=true")
self._started = True
def stop(self) -> None:
"""
Stop the TimePix detector by sending a GET request to the stop endpoint.
This method is a wrapper around the REST API call to stop the detector.
"""
self._get(get_cmd="?stop=true")
self._started = False
def stop_collect(self) -> None:
"""
Stop the data collection of the TimePix detector by sending a GET request to the stop-collect endpoint.
This method is a wrapper around the REST API call to stop data collection.
"""
self._get(get_cmd="?stop_collect=true")
self._started = False
def kill(self) -> None:
"""
Kill the TimePix detector by sending a GET request to the kill endpoint.
This method is a wrapper around the REST API call to kill the detector.
"""
self._get(get_cmd="?kill=true")
self._started = False
def last_error(self) -> LastError:
"""
Get the last error message from the TimePix detector by sending a GET request
to the last-error endpoint.
Returns:
LastError: The last error message from the detector.
"""
return self._get(get_cmd="last-error", get_response_model=LastError)
def state(self) -> ProgramState:
"""
Get the program state of the TimePix detector by sending a GET request
to the state endpoint.
Returns:
ProgramState: The current state of the TimePix detector.
"""
return self._get(get_cmd="state", get_response_model=ProgramState)
def version(self) -> Version:
"""
Get the version of the TimePix detector by sending a GET request
to the version endpoint.
Returns:
Version: The version information of the TimePix detector.
"""
return self._get(get_cmd="version", get_response_model=Version)
def get_pixel_map(self) -> PixelMap:
"""
Get the pixel map of the TimePix detector by sending a GET request
to the pixel-map endpoint.
Returns:
PixelMap: The pixel map of the TimePix detector.
"""
return self._get(get_cmd="pixel-map", get_response_model=PixelMap)
def set_pixel_map(self, pixel_map: PixelMap | dict) -> None:
"""
Set the pixel map of the TimePix detector by sending a PUT request
to the pixel-map endpoint.
Args:
pixel_map (PixelMap | dict): The pixel map to set. Can be a PixelMap instance or a dictionary.
"""
if not isinstance(pixel_map, PixelMap):
if isinstance(pixel_map, dict):
pixel_map = PixelMap(**pixel_map)
else:
raise ValueError(
f"Value must be an instance of PixelMap. Received {type(pixel_map)}, {pixel_map}."
)
self._put(put_cmd="pixel-map", value=pixel_map.model_dump(), put_response_model=None)
def set_pixel_map_from_file(self, pixel_map_file: PixelMapFromFile | dict | str) -> None:
"""
Set the pixel map of the TimePix detector from a file by sending a PUT request
to the pixel-map-from-file endpoint.
Args:
pixel_map_file (PixelMapFromFile | dict): The pixel map from a file to set.
Can be a PixelMapFromFile instance or a dictionary.
"""
if not isinstance(pixel_map_file, PixelMapFromFile):
if isinstance(pixel_map_file, dict):
pixel_map_file = PixelMapFromFile(**pixel_map_file)
elif isinstance(pixel_map_file, str):
pixel_map_file = PixelMapFromFile(filename=pixel_map_file)
else:
raise ValueError(
f"Value must be an instance of PixelMapFromFile. Received {type(pixel_map_file)}, {pixel_map_file}."
)
self._put(
put_cmd="pixel-map-from-file",
value=pixel_map_file.model_dump(),
put_response_model=PixelMapFromFile,
)
def get_other_config(self) -> OtherConfigModel:
"""
Get the other configuration parameters of the TimePix detector by sending a GET request
to the other-config endpoint.
Returns:
OtherConfigModel: The other configuration parameters of the TimePix detector.
"""
return self._get(get_cmd="other-config", get_response_model=OtherConfigModel)
def set_other_config(self, other_config: OtherConfigModel | dict) -> None:
"""
Set the other configuration parameters of the TimePix detector by sending a PUT request
to the other-config endpoint.
Args:
other_config (OtherConfigModel | dict): The other configuration parameters to set.
Can be an OtherConfigModel instance or a dictionary.
"""
if not isinstance(other_config, OtherConfigModel):
if isinstance(other_config, dict):
other_config = OtherConfigModel(**other_config)
else:
raise ValueError(
f"Value must be an instance of OtherConfigModel. Received {type(other_config)}, {other_config}."
)
self._put(put_cmd="other-config", value=other_config.model_dump(), put_response_model=None)
def get_net_addresses(self) -> NetAddresses:
"""
Get the network addresses of the TimePix detector by sending a GET request
to the net-addresses endpoint.
Returns:
dict[str, str]: A dictionary containing the network addresses of the TimePix detector.
"""
return self._get(get_cmd="net-addresses", get_response_model=NetAddresses)
@@ -0,0 +1,182 @@
"""
This module defines Pydantic models for the TimePix detector API responses. These
models are used to validate and structure the data returned by the TimePix REST API.
Any change will be reflected immediately, which will simplify debugging if the API changes.
"""
from typing import Literal
from pydantic import BaseModel, Field
# pylint: disable=line-too-long
class TimePixResponse(BaseModel):
"""Base model for TimePix responses."""
model_config = {"validate_assignment": True}
class OtherConfigModel(TimePixResponse):
"""
OtherConfigModel is a Pydantic model that represents the configuration
for the TimePix detector.
Attributes:
- type: str - The type of the configuration, default is "OtherConfig".
- output_uri: str - The URI for the data stream. The backend will send data to this address.
It is the responsibility of the device must start a TCP server and listen on this socket to
receive the data.
- save_interval: int - The interval at which histograms are written.
- TRoiStart: int - The start time for the Time ROI (Region of Interest).
- TRoiStep: int - The step size for the Time ROI.
- TRoiN: int - The number of points in the Time ROI.
"""
type: str = "OtherConfig"
output_uri: str
save_interval: int = Field(
default=131000, description="Interval in seconds to write histograms"
)
TRoiStart: int = Field(
default=0, description="Start time for the Time ROI (Region of Interest)"
)
TRoiStep: int = Field(default=1, description="Step size for the Time ROI")
TRoiN: int = Field(default=5000, description="Number of points in the Time ROI")
class LastError(TimePixResponse):
"""
TimePixLastError is a Pydantic model that represents the last error message
from the TimePix detector api for the REST API call '/last-error'.
Attributes:
- type: str - The type of the response, default is "LastError".
- message: str - The last error message from the detector.
"""
type: str = "LastError"
message: str
class ProgramState(TimePixResponse):
"""
ProgramState is a Pydantic model that represents the state of the TimePix program.
Attributes:
- type: str - The type of the response, default is "ProgramState".
- state: str - The current state of the progra, can be "init", "config", "setup", "collect", "shutdown"
"""
type: str = "ProgramState"
state: Literal["init", "config", "setup", "await_connection", "collect", "shutdown"]
class Version(TimePixResponse):
"""
Version is a Pydantic model that represents the version information of the TimePix detector.
Attributes:
- type: str - The type of the response, default is "Version".
- version: str - The version string of the TimePix detector.
"""
type: str = "Version"
version: str
class PixelMapFromFile(TimePixResponse):
"""
PixelMapFromFile is a Pydantic model that represents a pixel map loaded from a file.
Attributes:
- type: str - The type of the response, default is "PixelMapFromFile".
- file: str - The path to the file containing the pixel map.
"""
type: str = "PixelMapFromFile"
file: str
class PixelMap(TimePixResponse):
"""
PixelMap is a Pydantic model that represents the pixel mapping for the TimePix detector.
Attributes:
- type: str - The type of the response, default is "PixelMap".
- chips: list - A list of chips, each containing a list of pixel mappings.
"""
type: str = "PixelMap"
chips: list[list[dict[Literal["i", "p", "f"], int | float | list[int | float]]]]
# For efficiency, we do not arse the responses into Pydantic models, but use the dict from
# json directly. Nevertheless, we define the models here to have a common interface
# and to be able to use them in the future if needed.
class TimepixStartFrame(TimePixResponse):
"""TimepixStartFrame is a Pydantic model that represents the start frame of a TimePix acquisition."""
type: str = "StartFrame"
Mode: Literal["TOA"]
TRoiStart: int
TRoiStep: int
TRoiN: int
NumEnergyPoints: int
save_interval: int
class TimepixXESFrame(TimePixResponse):
"""TimepixXESFrame is a Pydantic model that represents a data frame from the TimePix detector."""
type: str = "XesData"
period: int
TDSpectra: list[float]
totalEvents: int
beforeROI: int
afterROI: int
class TimepixEndFrame(TimePixResponse):
"""
TimepixEndFrame is a Pydantic model that represents the end frame of a TimePix acquisition.
Attributes:
- type: str - The type of the response, default is "EndFrame".
- error: str - If an error occured during acuisition, this field contains the error message. Empty string otherwise.
- periods: int - The last period count minus 3 (3 being the period predictor delay).
"""
type: str = "EndFrame"
error: str
periods: int # Last period count minus 3 (3 being the period predictor delay)
# Habe ein GET /net-addresses call implementiert
# // /net-addresses GET applicable net addresses
# // GET return:
# // - status 200
# // - data
# // {
# // "type":"NetAddresses",
# // "control":"127.0.0.1:8452", // own rest interface
# // "address":"127.0.0.1:8451", // own address, the destination of ASI server raw data
# // "server":"127.0.0.1:8080" // ASI server rest interface address
# // }
class NetAddresses(TimePixResponse):
"""
NetAddresses is a Pydantic model that represents the network addresses used by the TimePix detector.
Attributes:
- type: str - The type of the response, default is "NetAddresses".
- control: str - The address of the REST interface for control commands.
- address: str - The address where the ASI server sends raw data.
- server: str - The address of the ASI server's REST interface.
"""
type: str = "NetAddresses"
control: str # timepix_rest_host
address: str # data_socket_for_asi
server: str # asi_rest_host
+382
View File
@@ -0,0 +1,382 @@
"""Temporary utility module for Status Object implementations."""
from __future__ import annotations
from typing import TYPE_CHECKING
from ophyd import Device, DeviceStatus, StatusBase
class AndStatusWithList(DeviceStatus):
"""
Custom implementation of the AndStatus that combines the
option to add multiple statuses as a list, and in addition
allows for adding the Device as an object to access its
methods.
Args"""
def __init__(
self,
device: Device,
status_list: StatusBase | DeviceStatus | list[StatusBase | DeviceStatus],
**kwargs,
):
self.all_statuses = status_list if isinstance(status_list, list) else [status_list]
super().__init__(device=device, **kwargs)
self._trace_attributes["all"] = [st._trace_attributes for st in self.all_statuses]
def inner(status):
with self._lock:
if self._externally_initiated_completion:
return
if self.done: # Return if status is already done.. It must be resolved already
return
for st in self.all_statuses:
with st._lock:
if st.done and not st.success:
self.set_exception(st.exception()) # st._exception
return
if all(st.done for st in self.all_statuses) and all(
st.success for st in self.all_statuses
):
self.set_finished()
for st in self.all_statuses:
with st._lock:
st.add_callback(inner)
# TODO improve __repr__ and __str__
def __repr__(self):
return "<AndStatusWithList({self.all_statuses!r})>".format(self=self)
def __str__(self):
return "<AndStatusWithList(done={self.done}, success={self.success})>".format(self=self)
def __contains__(self, status: StatusBase | DeviceStatus) -> bool:
for child in self.all_statuses:
if child == status:
return True
if isinstance(child, AndStatusWithList):
if status in child:
return True
return False
# # TODO Check if this actually works....
# def set_exception(self, exc):
# # Propagate the exception to all sub-statuses that are not done yet.
#
# with self._lock:
# if self._externally_initiated_completion:
# return
# if self.done: # Return if status is already done.. It must be resolved already
# return
# super().set_exception(exc)
# for st in self.all_statuses:
# with st._lock:
# if not st.done:
# st.set_exception(exc)
def _run_callbacks(self):
"""
Set the Event and run the callbacks.
"""
if self.timeout is None:
timeout = None
else:
timeout = self.timeout + self.settle_time
if not self._settled_event.wait(timeout):
self.log.warning("%r has timed out", self)
with self._externally_initiated_completion_lock:
if self._exception is None:
exc = TimeoutError(
f"AndStatus from device {self.device.name} failed to complete in specified timeout of {self.timeout + self.settle_time}."
)
self._exception = exc
# Mark this as "settled".
try:
self._settled()
except Exception:
self.log.exception("%r encountered error during _settled()", self)
with self._lock:
self._event.set()
if self._exception is not None:
try:
self._handle_failure()
except Exception:
self.log.exception("%r encountered an error during _handle_failure()", self)
for cb in self._callbacks:
try:
cb(self)
except Exception:
self.log.exception(
"An error was raised on a background thread while "
"running the callback %r(%r).",
cb,
self,
)
self._callbacks.clear()
class AndStatus(StatusBase):
"""Custom AndStatus for TimePix detector."""
def __init__(
self,
left: StatusBase | DeviceStatus | list[StatusBase | DeviceStatus] | None,
name: str | Device | None = None,
right: StatusBase | DeviceStatus | list[StatusBase | DeviceStatus] | None = None,
**kwargs,
):
self.left = left if isinstance(left, list) else [left]
if right is not None:
self.right = right if isinstance(right, list) else [right]
else:
self.right = []
self.all_statuses = self.left + self.right
if name is None:
name = "unname_status"
elif isinstance(name, Device):
name = name.name
else:
name = name
self.name = name
super().__init__(**kwargs)
self._trace_attributes["left"] = [st._trace_attributes for st in self.left]
self._trace_attributes["right"] = [st._trace_attributes for st in self.right]
def inner(status):
with self._lock:
if self._externally_initiated_completion:
return
if self.done: # Return if status is already done.. It must be resolved already
return
for st in self.all_statuses:
with st._lock:
if st.done and not st.success:
self.set_exception(st.exception()) # st._exception
return
if all(st.done for st in self.all_statuses) and all(
st.success for st in self.all_statuses
):
self.set_finished()
for st in self.all_statuses:
with st._lock:
st.add_callback(inner)
def __repr__(self):
return "({self.left!r} & {self.right!r})".format(self=self)
def __str__(self):
return "{0}(done={1.done}, " "success={1.success})" "".format(self.__class__.__name__, self)
def __contains__(self, status: StatusBase) -> bool:
for child in [self.left, self.right]:
if child == status:
return True
if isinstance(child, AndStatus):
if status in child:
return True
return False
def _run_callbacks(self):
"""
Set the Event and run the callbacks.
"""
if self.timeout is None:
timeout = None
else:
timeout = self.timeout + self.settle_time
if not self._settled_event.wait(timeout):
# We have timed out. It's possible that set_finished() has already
# been called but we got here before the settle_time timer expired.
# And it's possible that in this space be between the above
# statement timing out grabbing the lock just below,
# set_exception(exc) has been called. Both of these possibilties
# are accounted for.
self.log.warning("%r has timed out", self)
with self._externally_initiated_completion_lock:
# Set the exception and mark the Status as done, unless
# set_exception(exc) was called externally before we grabbed
# the lock.
if self._exception is None:
exc = TimeoutError(
f"Status with name {self.name} failed to complete in specified timeout of {self.timeout + self.settle_time}."
)
self._exception = exc
# Mark this as "settled".
try:
self._settled()
except Exception:
# No alternative but to log this. We can't supersede set_exception,
# and we have to continue and run the callbacks.
self.log.exception("%r encountered error during _settled()", self)
# Now we know whether or not we have succeed or failed, either by
# timeout above or by set_exception(exc), so we can set the Event that
# will mark this Status as done.
with self._lock:
self._event.set()
if self._exception is not None:
try:
self._handle_failure()
except Exception:
self.log.exception("%r encountered an error during _handle_failure()", self)
# The callbacks have access to self, from which they can distinguish
# success or failure.
for cb in self._callbacks:
try:
cb(self)
except Exception:
self.log.exception(
"An error was raised on a background thread while "
"running the callback %r(%r).",
cb,
self,
)
self._callbacks.clear()
# from __future__ import annotations
# from collections import defaultdict
# from typing import Dict, List, Tuple
# import numpy as np
# ROI = List[Tuple[float, float]]
# def order_roi_corners_simple(roi: ROI) -> np.ndarray:
# """Order ROI corners as [top-left, top-right, bottom-right, bottom-left]."""
# pts = np.array(roi, dtype=float)
# cx, cy = pts.mean(axis=0)
# angles = np.arctan2(pts[:, 1] - cy, pts[:, 0] - cx)
# idx = np.argsort(angles)
# ordered = pts[idx]
# # Ensure clockwise order
# if np.cross(ordered[1] - ordered[0], ordered[2] - ordered[0]) < 0:
# ordered = ordered[::-1]
# return ordered[:4]
# def compute_affine_transform(
# src: np.ndarray, dst: np.ndarray, preserve_scale: bool = True
# ) -> np.ndarray:
# """Compute affine transform mapping src -> dst. Optionally preserve pixel scale."""
# if preserve_scale:
# # Solve for rotation+translation only
# A = np.array(
# [
# [src[0, 0], -src[0, 1], 1, 0],
# [src[0, 1], src[0, 0], 0, 1],
# [src[1, 0], -src[1, 1], 1, 0],
# [src[1, 1], src[1, 0], 0, 1],
# ]
# )
# b = dst[:2].ravel()
# x, residuals, _, _ = np.linalg.lstsq(A, b, rcond=None)
# a, b_, tx, ty = x
# return np.array([[a, -b_, tx], [b_, a, ty]])
# else:
# # Full affine transform
# src_h = np.hstack([src, np.ones((4, 1))])
# dst_h = dst
# M, _, _, _ = np.linalg.lstsq(src_h, dst_h, rcond=None)
# return M.T
# def apply_affine_transform(coords: np.ndarray, affine: np.ndarray) -> np.ndarray:
# """Apply affine transform to coordinates."""
# coords_h = np.hstack([coords, np.ones((coords.shape[0], 1))])
# transformed = coords_h @ affine.T
# return transformed[:, :2]
# def roi_pixel_hits(
# image_shape: Tuple[int, int], roi: ROI, start_idx: int = 0, min_fraction_diff: float = 0.0
# ) -> List[Dict]:
# """
# For each ROI, return list of hits as dicts {'i': (x,y), 'p': row_idx, 'f': fraction}.
# Supports rotated rectangles using bilinear fraction splitting.
# """
# hits_dict: Dict[Tuple[int, int], Dict[str, list]] = defaultdict(lambda: {"p": [], "f": []})
# corners = order_roi_corners_simple(roi)
# height = int(np.linalg.norm(corners[0] - corners[3])) + 1
# width = int(np.linalg.norm(corners[0] - corners[1])) + 1
# dst = np.array([[0, 0], [width - 1, 0], [width - 1, height - 1], [0, height - 1]], dtype=float)
# affine_mat = compute_affine_transform(corners, dst) # 2x3
# # Bounding box in image
# min_x = max(int(np.floor(corners[:, 0].min())), 0)
# max_x = min(int(np.ceil(corners[:, 0].max())), image_shape[1] - 1)
# min_y = max(int(np.floor(corners[:, 1].min())), 0)
# max_y = min(int(np.ceil(corners[:, 1].max())), image_shape[0] - 1)
# yy, xx = np.meshgrid(np.arange(min_y, max_y + 1), np.arange(min_x, max_x + 1), indexing="ij")
# coords = np.stack([xx.ravel(), yy.ravel()], axis=1) # N x 2
# local_coords = apply_affine_transform(coords, affine_mat)
# x_local = local_coords[:, 0]
# y_local = local_coords[:, 1]
# # Keep pixels inside ROI rectangle
# mask = (x_local >= 0) & (x_local <= width - 1) & (y_local >= 0) & (y_local <= height - 1)
# coords_in = coords[mask]
# x_in = x_local[mask]
# y_in = y_local[mask]
# # Bilinear fractions
# x0 = np.floor(x_in).astype(int)
# y0 = np.floor(y_in).astype(int)
# dx = x_in - x0
# dy = y_in - y0
# for coord, x0i, y0i, dxv, dyv in zip(coords_in, x0, y0, dx, dy):
# row_base = start_idx
# # Contributions to 4 neighboring "rows"
# contributions = [
# (row_base + y0i, (1 - dxv) * (1 - dyv)),
# (row_base + y0i, dxv * (1 - dyv)),
# (row_base + y0i + 1, (1 - dxv) * dyv),
# (row_base + y0i + 1, dxv * dyv),
# ]
# # Filter negligible contributions
# contributions = [(p, f) for p, f in contributions if f >= min_fraction_diff]
# # Normalize fractions to sum 1
# if contributions:
# total_f = sum(f for _, f in contributions)
# contributions = [(p, f / total_f) for p, f in contributions]
# for p, f in contributions:
# key = (int(coord[0]), int(coord[1]))
# hits_dict[key]["p"].append(int(p))
# hits_dict[key]["f"].append(float(f))
# hits_roi = [{"i": key, "p": value["p"], "f": value["f"]} for key, value in hits_dict.items()]
# return hits_roi
# if __name__ == "__main__":
# image_shape = (512, 512)
# rois = [
# [(25, 25), (50, 50), (50, 25), (75, 50)], # rotated 45 degrees rectangle
# [(300, 300), (400, 300), (400, 400), (300, 400)], # upright rectangle
# ]
# hits_0 = roi_pixel_hits(image_shape, rois[0], min_fraction_diff=0.1)
# hits_1 = roi_pixel_hits(image_shape, rois[1], start_idx=10, min_fraction_diff=0.2)
# print(hits_0[:5])
# print(hits_1[:5])
+1
View File
@@ -0,0 +1 @@
from .superxas_nexus_structure import SuperXASNexusStructure
@@ -0,0 +1,308 @@
from bec_server.file_writer.default_writer import DefaultFormat
import superxas_bec.bec_widgets.widgets.x10da_parameters as bl
class SuperXASNexusStructure(DefaultFormat):
"""Nexus Structure for SuperXAS"""
def format(self) -> None:
"""Specify the file format for the file writer."""
entry = self.storage.create_group(name="entry")
entry.attrs["NX_class"] = "NXentry"
instrument = entry.create_group(name="instrument")
instrument.attrs["NX_class"] = "NXinstrument"
##################
## source specific information
###################
source = instrument.create_group(name="source")
source.attrs["NX_class"] = "NXsource"
beamline_name = source.create_dataset(name="beamline_name", data="SuperXAS")
beamline_name.attrs["NX_class"] = "NX_CHAR"
facility_name = source.create_dataset(name="facility_name", data="Swiss Light Source")
facility_name.attrs["NX_class"] = "NX_CHAR"
probe = source.create_dataset(name="probe", data="X-ray")
probe.attrs["NX_class"] = "NX_CHAR"
if "curr" in self.device_manager.devices:
ring_current = source.create_soft_link(
name="ring_current",
target="/entry/collection/devices/curr/curr/value",
)
ring_current.attrs["NX_class"] = "NX_FLOAT"
ring_current.attrs["units"] = "mA"
###################
## mo1_bragg specific information
###################
## Logic if device exist
if "mo1_bragg" in self.device_manager.devices:
monochromator = instrument.create_group(name="monochromator")
monochromator.attrs["NX_class"] = "NXmonochromator"
crystal = monochromator.create_group(name="crystal")
crystal.attrs["NX_class"] = "NXcrystal"
# Create a dataset
chemical_formular = crystal.create_dataset(name="chemical_formular", data="Si")
chemical_formular.attrs["NX_class"] = "NX_CHAR"
reflection = crystal.create_soft_link(
name="reflection",
target="/entry/collection/devices/mo1_bragg/mo1_bragg_crystal_current_xtal_string/value",
)
reflection.attrs["NX_class"] = "NX_CHAR"
# Create a softlink
d_spacing = crystal.create_soft_link(
name="d_spacing",
target="/entry/collection/devices/mo1_bragg/mo1_bragg_crystal_current_d_spacing/value",
)
d_spacing.attrs["NX_class"] = "NX_FLOAT"
d_spacing.attrs["units"] = "angstrom"
bragg_offset = crystal.create_soft_link(
name="bragg_offset",
target="/entry/collection/devices/mo1_bragg/mo1_bragg_crystal_current_bragg_off/value",
)
bragg_offset.attrs["NX_class"] = "NX_FLOAT"
bragg_offset.attrs["units"] = "degree"
###################
### cm mirror specific information
####################
collimating_mirror = instrument.create_group(name="collimating_mirror")
collimating_mirror.attrs["NX_class"] = "NXmirror"
cm_substrate_material = collimating_mirror.create_dataset(
name="substrate_material", data="Si"
)
cm_substrate_material.attrs["NX_class"] = "NX_CHAR"
#previous error due to space in name field
if "cm_bnd" in self.device_manager.devices:
cm_bending = collimating_mirror.create_soft_link(
name="sagittal_radius_bender_motor",
target="/entry/collection/devices/cm_bnd/cm_bnd/value",
)
cm_bending.attrs["NX_class"] = "NX_FLOAT"
cm_bending.attrs["units"] = "steps"
if "cm_rotx" in self.device_manager.devices:
cm_incidence_angle = collimating_mirror.create_soft_link(
name="incidence_angle", target="/entry/collection/devices/cm_rotx/cm_rotx/value"
)
cm_incidence_angle.attrs["NX_class"] = "NX_FLOAT"
cm_incidence_angle.attrs["units"] = "mrad"
if "cm_roty" in self.device_manager.devices:
cm_yaw_angle = collimating_mirror.create_soft_link(
name="yaw_angle", target="/entry/collection/devices/cm_roty/cm_roty/value"
)
cm_yaw_angle.attrs["NX_class"] = "NX_FLOAT"
cm_yaw_angle.attrs["units"] = "mrad"
if "cm_rotz" in self.device_manager.devices:
cm_roll_angle = collimating_mirror.create_soft_link(
name="roll_angle", target="/entry/collection/devices/cm_rotz/cm_rotz/value"
)
cm_roll_angle.attrs["NX_class"] = "NX_FLOAT"
cm_roll_angle.attrs["units"] = "mrad"
if 'cm_trx' in self.device_manager.devices:
cm_trx = - self.device_manager.devices.cm_trx.read(cached=True).get('cm_trx').get('value')
stripe = 'Unknown'
for name, low, high in zip(bl.cm.surface, bl.cm.limOptX[0], bl.cm.limOptX[1]):
if low <= cm_trx <= high:
stripe = name
cm_stripe = collimating_mirror.create_dataset(
name="stripe", data=stripe
)
cm_stripe.attrs["NX_class"] = "NX_CHAR"
###################
### fm mirror specific information
####################
focusing_mirror = instrument.create_group(name="focusing_mirror")
focusing_mirror.attrs["NX_class"] = "NXmirror"
fm_substrate_material = focusing_mirror.create_dataset(
name="substrate_material", data="Si"
)
fm_substrate_material.attrs["NX_class"] = "NX_CHAR"
if "fm_bnd" in self.device_manager.devices:
fm_bending = focusing_mirror.create_soft_link(
name="sagittal_radius_bender_motor",
target="/entry/collection/devices/fm_bnd/fm_bnd/value",
)
fm_bending.attrs["NX_class"] = "NX_FLOAT"
fm_bending.attrs["units"] = "steps"
if "fm_rotx" in self.device_manager.devices:
fm_incidence_angle = focusing_mirror.create_soft_link(
name="incidence_angle", target="/entry/collection/devices/fm_rotx/fm_rotx/value"
)
fm_incidence_angle.attrs["NX_class"] = "NX_FLOAT"
fm_incidence_angle.attrs["units"] = "mrad"
if "fm_roty" in self.device_manager.devices:
fm_yaw_angle = focusing_mirror.create_soft_link(
name="yaw_angle", target="/entry/collection/devices/fm_roty/fm_roty/value"
)
fm_yaw_angle.attrs["NX_class"] = "NX_FLOAT"
fm_yaw_angle.attrs["units"] = "mrad"
if "fm_rotz" in self.device_manager.devices:
fm_roll_angle = focusing_mirror.create_soft_link(
name="roll_angle", target="/entry/collection/devices/fm_rotz/fm_rotz/value"
)
fm_roll_angle.attrs["NX_class"] = "NX_FLOAT"
fm_roll_angle.attrs["units"] = "mrad"
if 'fm_trx' in self.device_manager.devices:
fm_trx = - self.device_manager.devices.fm_trx.read(cached=True).get('fm_trx').get('value')
stripe = 'Unknown'
for name, low, high in zip(bl.fm.surfaceToroid, bl.fm.limOptXToroid[1], bl.fm.limOptXToroid[0]):
if low <= fm_trx <= high:
stripe = name + ' (toroid)'
fm_stripe = focusing_mirror.create_dataset(
name="stripe", data=stripe
)
fm_stripe.attrs["NX_class"] = "NX_CHAR"
###################
## nidaq specific information
###################
## Logic if device exist
if "nidaq" in self.device_manager.devices:
#ai_chans_bits = self.device_manager.devices.nidaq.ai_chans.read(cached=True).get("nidaq_ai_chans").get("value")
ai_chans_bits = self.configuration.get("nidaq", {}).get("nidaq_ai_chans", {}).get("value")
ci_chans_bits = self.configuration.get("nidaq", {}).get("nidaq_ci_chans", {}).get("value")
#add_chans_bits = self.device_manager.devices.nidaq.add_chans.read(cached=True).get("nidaq_add_chans").get("value")
add_chans_bits = self.configuration.get("nidaq", {}).get("nidaq_add_chans", {}).get("value")
measurement_mode = entry.create_group(name="mode")
measurement_mode.attrs["NX_class"] = "NX_CHAR"
if (int(ci_chans_bits) & 0x7F) != 0:
# Create a dataset
rayspec_sdd_active = measurement_mode.create_group(name="Multi_Element_Partial_Fluorescence_Yield")
me_sdd = rayspec_sdd_active.create_dataset(name="Detector", data="Rayspec 7 element Silicon Drift Detector")
me_sdd.attrs["NX_class"] = "NX_CHAR"
if (int(ci_chans_bits) & (1<<8)) != 0:
# Create a dataset
ketek_sdd_active = measurement_mode.create_group(name="Single_Element_Partial_Fluorescence_Yield")
se_sdd = ketek_sdd_active.create_dataset(name="Detector", data="Ketex mini single element Silicon Drift Detector")
se_sdd.attrs["NX_class"] = "NX_CHAR"
if ((int(ai_chans_bits) & (1<<6)) != 0):
# Create a dataset
pips_active = measurement_mode.create_group(name="Total_Flourescence_Yield")
tfy = pips_active.create_dataset(name="Detector", data="Mirion Technologies Partially Depeleted PIPS Detector")
tfy.attrs["NX_class"] = "NX_CHAR"
if ((int(ai_chans_bits) & (1<<0)) != 0) & ((int(ai_chans_bits) & (1<<2)) != 0):
# Create a dataset
ai0ai2_active = measurement_mode.create_group(name="Sample_Transmission")
sam_trans = ai0ai2_active.create_dataset(name="Detector", data="Ionitec 15 cm gas filled Ionisation Chambers")
sam_trans.attrs["NX_class"] = "NX_CHAR"
if ((int(ai_chans_bits) & (1<<2)) != 0) & ((int(ai_chans_bits) & (1<<4)) != 0):
# Create a dataset
ai2ai4_active = measurement_mode.create_group(name="Reference_Transmission")
ref_trans = ai2ai4_active.create_dataset(name="Detector", data="Ionitec 15 cm gas filled Ionisation Chambers")
ref_trans.attrs["NX_class"] = "NX_CHAR"
main_data = entry.create_group(name="data")
main_data.attrs["NX_class"] = "NXdata"
##################
## energy, test whether the signal exists. how to check from config?
###################
energy = main_data.create_group(name="energy")
energy.attrs["NX_class"] = "NXdata"
energy.attrs["units"] = "eV"
main_data.create_soft_link(name="energy", target="/entry/collection/readout_groups/async/nidaq/nidaq_energy/value")
##################
## i0
###################
if (int(ai_chans_bits) & (1<<0)) !=0:
i0 = main_data.create_group(name="i0")
i0.attrs["NX_class"] = "NXdata"
i0.attrs["units"] = "V"
main_data.create_soft_link(name="i0", target="/entry/collection/readout_groups/async/nidaq/nidaq_ai0_mean/value")
##################
## i1
###################
if (int(ai_chans_bits) & (1<<2)) !=0:
i1 = main_data.create_group(name="i1")
i1.attrs["NX_class"] = "NXdata"
i1.attrs["units"] = "V"
main_data.create_soft_link(name="i1", target="/entry/collection/readout_groups/async/nidaq/nidaq_ai2_mean/value")
##################
## i2
###################
if (int(ai_chans_bits) & (1<<4)) !=0:
i2 = main_data.create_group(name="i2")
i2.attrs["NX_class"] = "NXdata"
i2.attrs["units"] = "V"
main_data.create_soft_link(name="i2", target="/entry/collection/readout_groups/async/nidaq/nidaq_ai4_mean/value")
##################
## ci sum
###################
if int(ci_chans_bits) > 0:
ci_sum = main_data.create_group(name="Fluorescence_Sum")
ci_sum.attrs["NX_class"] = "NXdata"
ci_sum.attrs["units"] = "counts"
main_data.create_soft_link(name="Fluorescence_Sum", target="/entry/collection/readout_groups/async/nidaq/nidaq_cisum/value")
##################
## mu sample, test whether the signal exists. how to check from config?
###################
if (int(add_chans_bits) & (1<<0)) !=0:
mu_sample = main_data.create_group(name="mu_sample")
mu_sample.attrs["NX_class"] = "NXdata"
main_data.create_soft_link(name="mu_sample", target="/entry/collection/readout_groups/async/nidaq/nidaq_smpl_abs/value")
##################
## mu reference, test whether the signal exists. how to check from config?
###################
if (int(add_chans_bits) & (1<<1)) !=0:
mu_reference = main_data.create_group(name="mu_reference")
mu_reference.attrs["NX_class"] = "NXdata"
main_data.create_soft_link(name="mu_reference", target="/entry/collection/readout_groups/async/nidaq/nidaq_ref_abs/value")
+9 -1
View File
@@ -1 +1,9 @@
from .exafs_scan import EXAFSScan
from .exafs_scan import EXAFSScan
from .mono_bragg_scans import (
XASAdvancedScan,
# XASAdvancedScanWithXRD,
XASSimpleScan,
# XASSimpleScanWithXRD,
)
from .nidaq_cont_scan import NIDAQContinuousScan
@@ -1,6 +1,6 @@
# from .metadata_schema_template import ExampleSchema
# from .metadata_schema_xas_simple_scan import xas_simple_scan_schema
METADATA_SCHEMA_REGISTRY = {
METADATA_SCHEMA_REGISTRY = { # "xas_simple_scan": xas_simple_scan_schema
# Add models which should be used to validate scan metadata here.
# Make a model according to the template, and import it as above
# Then associate it with a scan like so:
@@ -9,4 +9,4 @@ METADATA_SCHEMA_REGISTRY = {
# Define a default schema type which should be used as the fallback for everything:
DEFAULT_SCHEMA = None
DEFAULT_SCHEMA = None
@@ -0,0 +1,8 @@
from bec_lib.metadata_schema import BasicScanMetadata
#
#
class xas_simple_scan_schema(BasicScanMetadata):
Edge: str
Element: str
+338
View File
@@ -0,0 +1,338 @@
"""This module contains the scan classes for the mono bragg motor of the SuperXAS beamline."""
import time
from typing import Literal
import numpy as np
from bec_lib.device import DeviceBase
from bec_lib.logger import bec_logger
from bec_server.scan_server.scans import AsyncFlyScanBase
logger = bec_logger.logger
class XASSimpleScan(AsyncFlyScanBase):
"""Class for the XAS simple scan"""
scan_name = "xas_simple_scan"
scan_type = "fly"
scan_report_hint = "device_progress"
required_kwargs = []
use_scan_progress_report = False
pre_move = False
gui_config = {
"Movement Parameters": ["start", "stop"],
"Scan Parameters": ["scan_time", "scan_duration"],
}
def __init__(
self,
start: float,
stop: float,
scan_time: float,
scan_duration: float,
motor: DeviceBase = "mo1_bragg",
**kwargs,
):
"""The xas_simple_scan is used to start a simple oscillating scan on the mono bragg motor.
Start and Stop define the energy range for the scan, scan_time is the time for one scan
cycle and scan_duration is the duration of the scan. If scan duration is set to 0, the
scan will run infinitely.
Args:
start (float): Start energy for the scan.
stop (float): Stop energy for the scan.
scan_time (float): Time for one scan cycle.
scan_duration (float): Duration of the scan.
motor (DeviceBase, optional): Motor device to be used for the scan.
Defaults to "mo1_bragg".
Examples:
>>> scans.xas_simple_scan(start=8000, stop=9000, scan_time=1, scan_duration=10)
"""
super().__init__(**kwargs)
self.motor = motor
self.start = start
self.stop = stop
self.scan_time = scan_time
self.scan_duration = scan_duration
self.primary_readout_cycle = 1
def stage(self):
"""call the stage procedure"""
# Compute position for mo1_gonio pre move
# Since energy is not linear to angle, we have to calculate the angles first.
pos_start = yield from self.stubs.send_rpc_and_wait(
"mo1_bragg",
"convert_angle_energy",
mode = "EnergyToAngle",
inp = self.start,
)
pos_end = yield from self.stubs.send_rpc_and_wait(
"mo1_bragg",
"convert_angle_energy",
mode = "EnergyToAngle",
inp = self.stop,
)
# Goniometer position is in the middle of the start and stop angle of the scan
pos = (pos_start + pos_end) / 2
# Premove with mo1_gonio
yield from self.stubs.send_rpc_and_wait(
"mo1_gonio",
"move",
position = pos,
wait = True,
timeout = 30, # 30 seconds timeout
)
# Continue with staging the devices
yield from self.stubs.stage()
def update_readout_priority(self):
"""Ensure that NIDAQ is not monitored for any quick EXAFS."""
super().update_readout_priority()
self.readout_priority["async"].append("nidaq")
def prepare_positions(self):
"""Prepare the positions for the scan.
Use here only start and end energy defining the range for the scan.
"""
self.positions = np.array([self.start, self.stop], dtype=float)
self.num_pos = None
yield None
def pre_scan(self):
"""Pre Scan action."""
self._check_limits()
# Ensure parent class pre_scan actions to be called.
yield from super().pre_scan()
def scan_report_instructions(self):
"""
Return the instructions for the scan report.
"""
yield from self.stubs.scan_report_instruction({"device_progress": [self.motor]})
def scan_core(self):
"""Run the scan core.
Kickoff the oscillation on the Bragg motor and wait for the completion of the motion.
"""
# Start the oscillation on the Bragg motor.
yield from self.stubs.kickoff(device=self.motor)
complete_status = yield from self.stubs.complete(device=self.motor, wait=False)
while not complete_status.done:
# Readout monitored devices
yield from self.stubs.read(group="monitored", point_id=self.point_id)
time.sleep(self.primary_readout_cycle)
self.point_id += 1
self.num_pos = self.point_id
# class XASSimpleScanWithXRD(XASSimpleScan):
# """Class for the XAS simple scan with XRD"""
# scan_name = "xas_simple_scan_with_xrd"
# gui_config = {
# "Movement Parameters": ["start", "stop"],
# "Scan Parameters": ["scan_time", "scan_duration"],
# "Low Energy Range": ["xrd_enable_low", "num_trigger_low", "exp_time_low", "cycle_low"],
# "High Energy Range": ["xrd_enable_high", "num_trigger_high", "exp_time_high", "cycle_high"],
# }
# def __init__(
# self,
# start: float,
# stop: float,
# scan_time: float,
# scan_duration: float,
# xrd_enable_low: bool,
# num_trigger_low: int,
# exp_time_low: float,
# cycle_low: int,
# xrd_enable_high: bool,
# num_trigger_high: int,
# exp_time_high: float,
# cycle_high: float,
# motor: DeviceBase = "mo1_bragg",
# **kwargs,
# ):
# """The xas_simple_scan_with_xrd is an oscillation motion on the mono motor
# with XRD triggering at low and high energy ranges.
# If scan duration is set to 0, the scan will run infinitely.
# Args:
# start (float): Start energy for the scan.
# stop (float): Stop energy for the scan.
# scan_time (float): Time for one oscillation .
# scan_duration (float): Total duration of the scan.
# xrd_enable_low (bool): Enable XRD triggering for the low energy range.
# num_trigger_low (int): Number of triggers for the low energy range.
# exp_time_low (float): Exposure time for the low energy range.
# cycle_low (int): Specify how often the triggers should be considered,
# every nth cycle for low
# xrd_enable_high (bool): Enable XRD triggering for the high energy range.
# num_trigger_high (int): Number of triggers for the high energy range.
# exp_time_high (float): Exposure time for the high energy range.
# cycle_high (int): Specify how often the triggers should be considered,
# every nth cycle for high
# motor (DeviceBase, optional): Motor device to be used for the scan.
# Defaults to "mo1_bragg".
# Examples:
# >>> scans.xas_simple_scan_with_xrd(start=8000, stop=9000, scan_time=1, scan_duration=10, xrd_enable_low=True, num_trigger_low=5, cycle_low=2, exp_time_low=100, xrd_enable_high=False, num_trigger_high=3, cycle_high=1, exp_time_high=1000)
# """
# super().__init__(
# start=start,
# stop=stop,
# scan_time=scan_time,
# scan_duration=scan_duration,
# motor=motor,
# **kwargs,
# )
# self.xrd_enable_low = xrd_enable_low
# self.num_trigger_low = num_trigger_low
# self.exp_time_low = exp_time_low
# self.cycle_low = cycle_low
# self.xrd_enable_high = xrd_enable_high
# self.num_trigger_high = num_trigger_high
# self.exp_time_high = exp_time_high
# self.cycle_high = cycle_high
class XASAdvancedScan(XASSimpleScan):
"""Class for the XAS advanced scan"""
scan_name = "xas_advanced_scan"
gui_config = {
"Movement Parameters": ["start", "stop"],
"Scan Parameters": ["scan_time", "scan_duration"],
"Spline Parameters": ["p_kink", "e_kink"],
}
def __init__(
self,
start: float,
stop: float,
scan_time: float,
scan_duration: float,
p_kink: float,
e_kink: float,
motor: DeviceBase = "mo1_bragg",
**kwargs,
):
"""The xas_advanced_scan is an oscillation motion on the mono motor.
Start and Stop define the energy range for the scan, scan_time is the
time for one scan cycle and scan_duration is the duration of the scan.
If scan duration is set to 0, the scan will run infinitely.
p_kink and e_kink add a kink to the motion profile to slow down in the
exafs region of the scan.
Args:
start (float): Start angle for the scan.
stop (float): Stop angle for the scan.
scan_time (float): Time for one oscillation .
scan_duration (float): Total duration of the scan.
p_kink (float): Position of the kink.
e_kink (float): Energy of the kink.
motor (DeviceBase, optional): Motor device to be used for the scan.
Defaults to "mo1_bragg".
Examples:
>>> scans.xas_advanced_scan(start=10000, stop=12000, scan_time=0.5, scan_duration=10, p_kink=50, e_kink=10500)
"""
super().__init__(
start=start,
stop=stop,
scan_time=scan_time,
scan_duration=scan_duration,
motor=motor,
**kwargs,
)
self.p_kink = p_kink
self.e_kink = e_kink
# class XASAdvancedScanWithXRD(XASAdvancedScan):
# """Class for the XAS advanced scan with XRD"""
# scan_name = "xas_advanced_scan_with_xrd"
# gui_config = {
# "Movement Parameters": ["start", "stop"],
# "Scan Parameters": ["scan_time", "scan_duration"],
# "Spline Parameters": ["p_kink", "e_kink"],
# "Low Energy Range": ["xrd_enable_low", "num_trigger_low", "exp_time_low", "cycle_low"],
# "High Energy Range": ["xrd_enable_high", "num_trigger_high", "exp_time_high", "cycle_high"],
# }
# def __init__(
# self,
# start: float,
# stop: float,
# scan_time: float,
# scan_duration: float,
# p_kink: float,
# e_kink: float,
# xrd_enable_low: bool,
# num_trigger_low: int,
# exp_time_low: float,
# cycle_low: int,
# xrd_enable_high: bool,
# num_trigger_high: int,
# exp_time_high: float,
# cycle_high: float,
# motor: DeviceBase = "mo1_bragg",
# **kwargs,
# ):
# """The xas_advanced_scan is an oscillation motion on the mono motor
# with XRD triggering at low and high energy ranges.
# Start and Stop define the energy range for the scan, scan_time is the time for
# one scan cycle and scan_duration is the duration of the scan. If scan duration
# is set to 0, the scan will run infinitely. p_kink and e_kink add a kink to the
# motion profile to slow down in the exafs region of the scan.
# Args:
# start (float): Start angle for the scan.
# stop (float): Stop angle for the scan.
# scan_time (float): Time for one oscillation .
# scan_duration (float): Total duration of the scan.
# p_kink (float): Position of kink.
# e_kink (float): Energy of the kink.
# xrd_enable_low (bool): Enable XRD triggering for the low energy range.
# num_trigger_low (int): Number of triggers for the low energy range.
# exp_time_low (float): Exposure time for the low energy range.
# cycle_low (int): Specify how often the triggers should be considered,
# every nth cycle for low
# xrd_enable_high (bool): Enable XRD triggering for the high energy range.
# num_trigger_high (int): Number of triggers for the high energy range.
# exp_time_high (float): Exposure time for the high energy range.
# cycle_high (int): Specify how often the triggers should be considered,
# every nth cycle for high
# motor (DeviceBase, optional): Motor device to be used for the scan.
# Defaults to "mo1_bragg".
# Examples:
# >>> scans.xas_advanced_scan_with_xrd(start=10000, stop=12000, scan_time=0.5, scan_duration=10, p_kink=50, e_kink=10500, xrd_enable_low=True, num_trigger_low=5, cycle_low=2, exp_time_low=100, xrd_enable_high=False, num_trigger_high=3, cycle_high=1, exp_time_high=1000)
# """
# super().__init__(
# start=start,
# stop=stop,
# scan_time=scan_time,
# scan_duration=scan_duration,
# p_kink=p_kink,
# e_kink=e_kink,
# motor=motor,
# **kwargs,
# )
# self.p_kink = p_kink
# self.e_kink = e_kink
# self.xrd_enable_low = xrd_enable_low
# self.num_trigger_low = num_trigger_low
# self.exp_time_low = exp_time_low
# self.cycle_low = cycle_low
# self.xrd_enable_high = xrd_enable_high
# self.num_trigger_high = num_trigger_high
# self.exp_time_high = exp_time_high
# self.cycle_high = cycle_high
+84
View File
@@ -0,0 +1,84 @@
"""This module contains the scan class for the nidaq of the SuperXAS beamline for use in continuous mode."""
import time
from typing import Literal
import numpy as np
from bec_lib.device import DeviceBase
from bec_lib.logger import bec_logger
from bec_server.scan_server.scans import AsyncFlyScanBase
logger = bec_logger.logger
class NIDAQContinuousScan(AsyncFlyScanBase):
"""Class for the nidaq continuous scan (without mono)"""
scan_name = "nidaq_continuous_scan"
scan_type = "fly"
scan_report_hint = "device_progress"
required_kwargs = []
use_scan_progress_report = False
pre_move = False
gui_config = {"Scan Parameters": ["scan_duration"], "Data Compression": ["compression"]}
def __init__(
self, scan_duration: float, daq: DeviceBase = "nidaq", compression: bool = False, **kwargs
):
"""The NIDAQ continuous scan is used to measure with the NIDAQ without moving the
monochromator or any other motor. The NIDAQ thus runs in continuous mode, with a
set scan_duration.
Args:
scan_duration (float): Duration of the scan.
daq (DeviceBase, optional): DAQ device to be used for the scan.
Defaults to "nidaq".
Examples:
>>> scans.nidaq_continuous_scan(scan_duration=10)
"""
super().__init__(**kwargs)
self.scan_duration = scan_duration
self.daq = daq
self.start_time = 0
self.primary_readout_cycle = 1
self.scan_parameters["scan_duration"] = scan_duration
self.scan_parameters["compression"] = compression
def update_readout_priority(self):
"""Ensure that NIDAQ is not monitored for any quick EXAFS."""
super().update_readout_priority()
self.readout_priority["async"].append("nidaq")
def prepare_positions(self):
"""Prepare the positions for the scan."""
yield None
def pre_scan(self):
"""Pre Scan action."""
self.start_time = time.time()
# Ensure parent class pre_scan actions to be called.
yield from super().pre_scan()
def scan_report_instructions(self):
"""
Return the instructions for the scan report.
"""
yield from self.stubs.scan_report_instruction({"device_progress": [self.daq]})
def scan_core(self):
"""Run the scan core.
Kickoff the acquisition of the NIDAQ wait for the completion of the scan.
"""
kickoff_status = yield from self.stubs.kickoff(device=self.daq)
kickoff_status.wait(timeout=5) # wait for proper kickoff of device
complete_status = yield from self.stubs.complete(device=self.daq, wait=False)
while not complete_status.done:
# Readout monitored devices
yield from self.stubs.read(group="monitored", point_id=self.point_id)
time.sleep(self.primary_readout_cycle)
self.point_id += 1
self.num_pos = self.point_id
@@ -0,0 +1,216 @@
from __future__ import annotations
import queue
import threading
import time
import traceback
import h5py
import numpy as np
from bec_lib import bec_logger, messages
from bec_lib.bec_service import BECService
from bec_lib.endpoints import MessageEndpoints
from bec_lib.file_utils import FileWriter
from bec_lib.redis_connector import MessageObject, RedisConnector
from bec_lib.service_config import ServiceConfig
logger = bec_logger.logger
class NIDAQWriterService(BECService):
"""
A service that receives data from the NIDAQ through Redis and writes it continuously to a file.
"""
reshape_dataset = True
use_redis_stream = True
def __init__(self, config: ServiceConfig, connector_cls: RedisConnector) -> None:
super().__init__(config=config, connector_cls=connector_cls, unique_service=True)
self.queue = queue.Queue()
config = self._service_config.config.get("file_writer")
self.writer_mixin = FileWriter(service_config=config)
self._scan_status_consumer = None
self._ni_data_consumer = None
self._ni_data_event = None
self._ni_writer = None
self._ni_writer_event = None
self.scan_number = None
self.scan_is_running = False
self.filename = ""
self.elapsed_time = 0
self.start_time = 0
self._start_scan_status_consumer()
self._start_ni_data_consumer()
self._start_ni_writer()
def _start_scan_status_consumer(self) -> None:
"""
Start the scan consumer.
"""
self._scan_status_consumer = self.connector.consumer(
MessageEndpoints.scan_status(), cb=self._scan_status_callback, parent=self
)
self._scan_status_consumer.start()
@staticmethod
def _scan_status_callback(message: MessageObject, parent: NIDAQWriterService) -> None:
"""
Callback for scan status messages.
"""
msg = message.value
if not msg:
return
parent.handle_scan_status(msg)
def _start_ni_data_consumer(self) -> None:
"""
Start the NI data consumer.
"""
self._ni_data_event = threading.Event()
self._ni_data_consumer = threading.Thread(target=self._run_read_loop, daemon=True)
self._ni_data_consumer.start()
def _start_ni_writer(self) -> None:
"""
Start the NI data writer.
"""
self._ni_writer_event = threading.Event()
self._ni_writer = threading.Thread(target=self._write_data, daemon=True)
self._ni_writer.start()
def _run_read_loop(self) -> None:
"""
Run the read loop.
"""
while not self._ni_data_event.is_set():
self._read_data()
def _read_data(self):
"""
Read data from Redis.
"""
if not self.scan_is_running:
time.sleep(0.01)
return
self.filename = self.writer_mixin.compile_full_filename(self.scan_number, "ni.h5")
start_time = time.time()
if self.use_redis_stream:
msg = self.connector.xread("ni_data")
if msg:
try:
num_msgs = len(msg[0][1])
logger.debug(
f"Received {num_msgs} messages in {time.time() - start_time} seconds"
)
msgs = [messages.DeviceMessage.loads(m[1][b"device_msg"]) for m in msg[0][1]]
start_time = time.time()
self.handle_ni_data(msgs)
logger.debug(
f"Handled {num_msgs} messages in {time.time() - start_time} seconds"
)
except Exception as exc:
content = traceback.format_exc()
logger.error(f"Failed to parse message: {content}")
time.sleep(0.01)
else:
msgs = self.connector._redis_conn.lpop("ni_data:val", 20)
time.sleep(0.001)
if msgs:
try:
msgs = [messages.DeviceMessage.loads(msg) for msg in msgs]
print(f"Received {len(msgs)} messages in {time.time() - start_time} seconds")
start_time = time.time()
self.handle_ni_data(msgs)
print(f"Handled {len(msgs)} messages in {time.time() - start_time} seconds")
except Exception as exc:
content = traceback.format_exc()
logger.error(f"Failed to parse message: {content}")
def handle_scan_status(self, msg: messages.ScanStatusMessage) -> None:
"""
Handle scan status messages.
Args:
msg: The scan status message.
"""
status = msg.content["status"]
if status == "open":
self.scan_number = msg.content["info"].get("scan_number")
if self.scan_number is not None:
self.scan_is_running = True
else:
self.scan_is_running = False
def handle_ni_data(self, msgs: list[messages.DeviceMessage]) -> None:
"""
Receive NI data messages and write them to the writer queue.
Args:
msgs: The NI data messages.
"""
logger.info(f"Received {len(msgs)} NI data messages")
# concatenate all messages
signals = {}
for key in msgs[0].content["signals"]:
signals[key] = np.concatenate([msg.content["signals"][key]["value"] for msg in msgs])
# write data to queue
self.queue.put(signals)
def _write_data(self) -> None:
"""
Get data from the writer queue and write it to disk.
"""
while not self._ni_writer_event.is_set():
signals = self.queue.get()
logger.info(f"Remaining tasks: {self.queue.qsize()}")
self.write_data(signals)
self.queue.task_done()
def write_data(self, signals: dict) -> None:
"""
Write data to disk.
Args:
signals: The signals to write to disk.
"""
# create a new file if it doesn't exist, otherwise append to it
logger.info("Writing NI data to HDF5 file")
start_time = time.time()
if not self.filename:
return
with h5py.File(self.filename, "a") as file:
if self.reshape_dataset:
for key in signals:
# if the dataset already exists, append to it
if key in file:
dataset = file[key]
dataset.resize(dataset.shape[0] + len(signals[key]), axis=0)
dataset[-len(signals[key]) :] = signals[key]
# otherwise create a new dataset
else:
file.create_dataset(key, data=signals[key], chunks=True, maxshape=(None,))
else:
# get all group names
group_names = list(file.keys())
# get max dataset number
dataset_num = [int(name.split("_")[1]) for name in group_names if "dataset" in name]
if dataset_num:
dataset_num = max(dataset_num) + 1
else:
dataset_num = 0
group = file.create_group(f"dataset_{dataset_num}")
for key in signals:
group.create_dataset(key, data=signals[key], chunks=True, maxshape=(None,))
logger.info(f"Finished writing NI data in {time.time() - start_time} seconds")
@@ -0,0 +1 @@
from .NIDAQ_writer import NIDAQWriterService
@@ -0,0 +1,34 @@
from bec_lib import messages
from bec_lib.endpoints import MessageEndpoints
from bec_lib.redis_connector import RedisConnector
def send_scan_status(scan_number, status):
if status == "start":
scan_status_msg = messages.ScanStatusMessage(
scanID="test", status="open", info={"scan_number": scan_number}
)
elif status == "stop":
scan_status_msg = messages.ScanStatusMessage(
scanID="test", status="closed", info={"scan_number": scan_number}
)
else:
raise ValueError("Unknown status")
producer = RedisConnector(["localhost:6379"]).producer()
producer.send(MessageEndpoints.scan_status(), scan_status_msg.dumps())
print(f"Sent scan status message {scan_status_msg}")
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Scan status helper")
command = parser.add_subparsers(dest="command")
start = command.add_parser("start", help="Start a new scan")
start.add_argument("--scan_number", type=int, required=True, help="Scan number")
stop = command.add_parser("stop", help="Stop the scan")
stop.add_argument("--scan_number", type=int, required=True, help="Scan number")
args = parser.parse_args()
send_scan_status(args.scan_number, args.command)
@@ -0,0 +1,44 @@
import threading
import time
import numpy as np
from bec_lib import messages
from bec_lib.redis_connector import RedisConnector
class NIDAQSim(threading.Thread):
use_redis_stream = True
def run(self):
print("NIDAQSim running")
index = 0
producer = RedisConnector(["localhost:6379"]).producer()
signal = np.asarray(range(index, index + 600000))
signals = {"signal1": signal, "signal2": signal}
msg = messages.DeviceMessage(signals=signals)
msg = msg.dumps()
messages.DeviceMessage.loads(msg)
total_time = time.time()
while True:
# if index > 1000:
# break
start = time.time()
# signals = {
# "signal1": np.asarray(range(index, index + 300000)),
# "signal2": np.asarray(range(index, index + 300000)),
# }
index += 1
if self.use_redis_stream:
producer.xadd("ni_data", {"device_msg": msg}, max_size=100)
else:
producer.lpush("ni_data", msg, max_size=10)
time.sleep(0.5)
print(f"Elapsed time: {time.time() - start}")
if __name__ == "__main__":
NIDAQSim().start()
+29
View File
@@ -0,0 +1,29 @@
import argparse
import threading
from bec_lib import bec_logger
from bec_lib.redis_connector import RedisConnector
from bec_lib.service_config import ServiceConfig
from superxas_bec.services.NIDAQ_writer import NIDAQWriterService
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("--config", default="", help="path to the config file")
clargs = parser.parse_args()
config_path = clargs.config
config = ServiceConfig(config_path)
bec_logger.level = bec_logger.LOGLEVEL.INFO
logger = bec_logger.logger
bec_server = NIDAQWriterService(config=config, connector_cls=RedisConnector)
try:
event = threading.Event()
# pylint: disable=E1102
logger.success("Started NIDAQ writer service")
event.wait()
except KeyboardInterrupt as e:
# bec_server.connector.raise_error("KeyboardInterrupt")
bec_server.shutdown()
event.set()
raise e
@@ -0,0 +1,15 @@
"""This module tests the Timepix Fly backend functionality."""
from __future__ import annotations
import pytest
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_backend import TimepixFlyBackend
@pytest.fixture(scope="function")
def timepix_fly_backend():
"""Fixture for creating a Timepix Fly backend instance."""
backend = TimepixFlyBackend(backend_rest_url="http://localhost:8000")
yield backend
backend.on_destroy()
@@ -0,0 +1,276 @@
"""Module to test the Timepix Fly client functionality."""
from __future__ import annotations
from unittest import mock
import pytest
from ophyd import StatusBase
from websockets import State
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_client import (
TimepixFlyClient,
TimePixFlyStatus,
TimePixStatusError,
)
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_interface import ProgramState
@pytest.fixture(scope="function")
def timepix_fly_client():
"""Fixture for creating a Timepix Fly client instance."""
client = TimepixFlyClient(rest_url="http://localhost:8000", ws_url="ws://localhost:8000/ws")
try:
yield client
finally:
client.shutdown()
@pytest.mark.parametrize(
"return_state",
[
ProgramState(state=TimePixFlyStatus.CONFIG),
ProgramState(state=TimePixFlyStatus.COLLECT),
ProgramState(state=TimePixFlyStatus.SETUP),
],
)
def test_timepix_fly_client_stop_running_collection(timepix_fly_client, return_state):
"""Test the on_connected method of the Timepix Fly client."""
with (
mock.patch.object(timepix_fly_client, "stop_collect") as mock_stop_collect,
mock.patch.object(timepix_fly_client, "state", return_value=return_state),
):
timepix_fly_client.stop_running_collection()
if return_state.state == TimePixFlyStatus.CONFIG:
assert mock_stop_collect.call_count == 0, "Stop collect should be called once."
timepix_fly_client._started = True
timepix_fly_client.stop_running_collection()
assert mock_stop_collect.call_count == 1, "Stop collect should not be called again."
else:
assert mock_stop_collect.call_count == 1, "Stop collect should be called once."
timepix_fly_client._started = True
timepix_fly_client.stop_running_collection()
assert mock_stop_collect.call_count == 2, "Stop collect should be called in CONFIG."
def test_timepix_fly_client_on_connected(timepix_fly_client):
"""
Test timepix fly client connect method.
This simply ensures that all methods are called. They are tested separately.
"""
with (
mock.patch.object(timepix_fly_client, "stop_running_collection") as mock_stop_collection,
mock.patch.object(timepix_fly_client, "connect") as mock_connect,
mock.patch.object(timepix_fly_client, "wait_for_connection") as mock_wait_for_connection,
):
timepix_fly_client.on_connected()
mock_stop_collection.assert_called_once()
mock_connect.assert_called_once()
mock_wait_for_connection.assert_called_once()
def test_timepix_fly_client_connect(timepix_fly_client):
"""This tests the connect method of timepix fly client."""
module_path = "superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_client"
# Patch Thread so we don't actually start the background loop
with mock.patch(f"{module_path}.threading.Thread") as mock_thread_cls:
mock_thread = mock.Mock()
mock_thread_cls.return_value = mock_thread
timepix_fly_client.connect()
# Thread should be created with the update loop as target and daemon True
mock_thread_cls.assert_called_once()
# start() must be called on the created thread
mock_thread.start.assert_called_once()
def test_timepix_fly_client_wait_for_connection(timepix_fly_client):
"""This tests the wait_for_connection method of timepix fly client."""
module_path = "superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_client"
# Case 1: ws_client already present and OPEN
mock_client = mock.Mock()
type(mock_client).state = mock.PropertyMock(return_value=State.OPEN)
timepix_fly_client.ws_client = mock_client
# Should return immediately
timepix_fly_client.wait_for_connection(timeout=0.1)
# Case 2: connect() establishes the connection
timepix_fly_client.ws_client = None
mock_client2 = mock.Mock()
type(mock_client2).state = mock.PropertyMock(return_value=State.OPEN)
with mock.patch(f"{module_path}.connect", return_value=mock_client2) as mock_connect:
timepix_fly_client.wait_for_connection(timeout=0.1)
mock_connect.assert_called_once()
def test_timepix_fly_client_ws_send_and_received(timepix_fly_client):
"""This tests the _ws_send_and_receive method of timepix fly client."""
# Prepare a mock ws_client where recv returns a message
mock_ws = mock.Mock()
mock_ws.recv.return_value = "init"
timepix_fly_client.ws_client = mock_ws
# Patch _on_received_ws_message to ensure it's invoked
with mock.patch.object(timepix_fly_client, "_on_received_ws_message") as mock_on_msg:
timepix_fly_client._ws_send_and_receive()
mock_on_msg.assert_called_once_with("init")
# Now simulate a TimeoutError from recv: should simply return and not call _on_received_ws_message
mock_ws2 = mock.Mock()
mock_ws2.recv.side_effect = TimeoutError
timepix_fly_client.ws_client = mock_ws2
with mock.patch.object(timepix_fly_client, "_on_received_ws_message") as mock_on_msg2:
timepix_fly_client._ws_send_and_receive()
mock_on_msg2.assert_not_called()
def test_timepix_fly_client_on_message_received(timepix_fly_client):
"""This tests the _on_received_ws_message method of timepix fly client."""
with mock.patch.object(timepix_fly_client, "_run_status_callbacks") as mock_run_callbacks:
timepix_fly_client._on_received_ws_message("init")
assert timepix_fly_client._status == TimePixFlyStatus.INIT
mock_run_callbacks.assert_called_once()
# invalid message should not change status or call callbacks
prev_status = timepix_fly_client._status
with mock.patch.object(timepix_fly_client, "_run_status_callbacks") as mock_run_callbacks2:
timepix_fly_client._on_received_ws_message("invalid_status_string")
# status stays as previous value and callbacks not called
assert timepix_fly_client._status == prev_status
mock_run_callbacks2.assert_not_called()
def test_timepix_fly_client_on_status_callbacks(timepix_fly_client):
"""This tests the _run_status_callbacks method of timepix fly client."""
# Immediate run when current status already in success
timepix_fly_client._status = TimePixFlyStatus.INIT
status = StatusBase()
timepix_fly_client.add_status_callback(
status=status, success=[TimePixFlyStatus.INIT], error=[TimePixFlyStatus.EXCEPT], run=True
)
assert status.done is True and status.success is True
# Add callback (do not run immediately) and then trigger via _run_status_callbacks
status2 = StatusBase()
timepix_fly_client.add_status_callback(
status=status2,
success=[TimePixFlyStatus.CONFIG],
error=[TimePixFlyStatus.EXCEPT],
run=False,
)
# Set status to CONFIG and mark started True to check reset
timepix_fly_client._status = TimePixFlyStatus.CONFIG
timepix_fly_client._started = True
timepix_fly_client._run_status_callbacks()
assert status2.done is True and status2.success is True
assert timepix_fly_client._started is False
# Error path: add with run True when status is EXCEPT
timepix_fly_client._status = TimePixFlyStatus.EXCEPT
status3 = StatusBase()
with mock.patch.object(timepix_fly_client, "last_error") as mock_last_error:
mock_err = mock.Mock()
mock_err.message = "boom"
mock_last_error.return_value = mock_err
timepix_fly_client.add_status_callback(
status=status3,
success=[TimePixFlyStatus.INIT],
error=[TimePixFlyStatus.EXCEPT],
run=True,
)
assert status3.done is True and status3.success is False
def test_timepix_fly_client_shutdown(timepix_fly_client):
"""This tests the shutdown method of timepix fly client."""
mock_ws = mock.Mock()
timepix_fly_client.ws_client = mock_ws
timepix_fly_client.shutdown()
mock_ws.close.assert_called_once()
assert timepix_fly_client.ws_client is None
assert timepix_fly_client._shutdown_event.is_set()
def test_timepix_fly_client_start(timepix_fly_client):
"""This tests the start method of timepix fly client."""
# The client._get uses requests.get with f"http://{self.rest_url}/{get_cmd}".
# We mock requests.get and verify it is called with the expected URL and timeout,
# and that the _started flag is set.
module_path = "superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_client"
with mock.patch(f"{module_path}.requests.get") as mock_get:
mock_resp = mock.Mock()
mock_resp.raise_for_status = mock.Mock()
mock_resp.text = ""
mock_get.return_value = mock_resp
timepix_fly_client.start()
expected_url = f"http://{timepix_fly_client.rest_url}/?start=true"
mock_get.assert_called_once_with(expected_url, timeout=timepix_fly_client._timeout)
assert timepix_fly_client._started is True
def test_timepix_fly_client_stop_collect(timepix_fly_client):
"""This tests the stop_collect method of timepix fly client."""
module_path = "superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_client"
with mock.patch(f"{module_path}.requests.get") as mock_get:
mock_resp = mock.Mock()
mock_resp.raise_for_status = mock.Mock()
mock_resp.text = ""
mock_get.return_value = mock_resp
# ensure started is True and then call stop_collect
timepix_fly_client._started = True
timepix_fly_client.stop_collect()
expected_url = f"http://{timepix_fly_client.rest_url}/?stop_collect=true"
mock_get.assert_called_once_with(expected_url, timeout=timepix_fly_client._timeout)
assert timepix_fly_client._started is False
def test_timepix_fly_client_state(timepix_fly_client):
"""This tests the state method of timepix fly client."""
module_path = "superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_client"
with mock.patch(f"{module_path}.requests.get") as mock_get:
mock_resp = mock.Mock()
mock_resp.raise_for_status = mock.Mock()
# Return a JSON payload compatible with ProgramState
mock_resp.json.return_value = {"type": "ProgramState", "state": "init"}
mock_get.return_value = mock_resp
program_state = timepix_fly_client.state()
expected_url = f"http://{timepix_fly_client.rest_url}/state"
mock_get.assert_called_once_with(expected_url, timeout=timepix_fly_client._timeout)
# ProgramState.model defines 'state' as a string literal; ensure we parsed it
assert hasattr(program_state, "state")
assert program_state.state == "init"
def test_timepix_fly_client_set_pixel_map(timepix_fly_client):
"""This tests the set_pixel_map/_put path by mocking requests.put and checking payload."""
module_path = "superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_client"
with mock.patch(f"{module_path}.requests.put") as mock_put:
mock_resp = mock.Mock()
mock_resp.raise_for_status = mock.Mock()
mock_put.return_value = mock_resp
# Minimal valid PixelMap dict (type is optional; model supplies default)
pixel_map = {"chips": [[{"i": 0, "p": 1, "f": 2}]]}
timepix_fly_client.set_pixel_map(pixel_map)
expected_url = f"http://{timepix_fly_client.rest_url}/pixel-map"
# Verify requests.put called with expected url, json and timeout
mock_put.assert_called_once()
_, kwargs = mock_put.call_args
assert kwargs.get("timeout") == timepix_fly_client._timeout
assert kwargs.get("json") is not None
assert kwargs.get("json").get("chips") == pixel_map["chips"]