Preliminary ophyd integration for the A3200 #13

Merged
mohacsi_i merged 27 commits from feature/device-aerotech-a3200 into main 2025-01-24 16:25:00 +01:00
13 changed files with 6315 additions and 344 deletions

2107
pxiii_bec/config_saved.yaml Normal file

File diff suppressed because it is too large Load Diff

2094
pxiii_bec/config_saved.yaml~ Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -1,55 +0,0 @@
# -*- coding: utf-8 -*-
"""
Created on Wed Aug 31 10:27:18 2022
Database migration from a sensible format to the BEC YAML file format
@author: mohacsi_i
"""
import yaml
import yaml.representer
def MigrateYamlFile(filein, fileout):
""" Migrates an absolutely minimal YAML config file to the format
required by the BEC (i.e. adding default fields).
"""
fp = open(filein, "r")
lut = yaml.load(fp, Loader=yaml.Loader)
# Allocate empty database
db = dict()
for k,v in lut.items():
new = v
# Adding defaults
if 'onFailure' not in new:
new['onFailure'] = "buffer"
if 'enabled' not in new:
new['enabled'] = True
if 'readoutPriority' not in new:
new['readoutPriority'] = "monitored"
if 'readOnly' not in new:
new['readOnly'] = bool(new['deviceClass'] in ('ophyd.EpicsSignalRO'))
if 'softwareTrigger' not in new:
new['softwareTrigger'] = False
if new['deviceClass'] == "ophyd.EpicsSignalRO":
if "read_pv" not in new['deviceConfig']:
new["deviceConfig"]["read_pv"] = new["deviceConfig"]["prefix"]
del new["deviceConfig"]["prefix"]
db[k] = new
with open(fileout, 'w') as stream:
yaml.dump(db, stream, default_flow_style=None, sort_keys=False)
# Automatically start simulation if directly invoked
if __name__ == "__main__":
MigrateYamlFile("./x06da_compact.lmay", "x06da_device_config.yaml")

View File

@@ -1,199 +0,0 @@
# OP before mono
slh_trxr:
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-OP-SLH:TRXR'
slh_trxw:
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-OP-SLH:TRXW'
fi1_try:
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-OP-FI1:TRY'
# DCCM crystal 1
dccm_pitch1:
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-OP-DCCM:PITCH1'
dccm_energy1:
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-OP-DCCM:ENERGY1'
dccm_diode:
deviceClass: ophyd.EpicsSignalRO
deviceConfig:
prefix: 'X06DA-OP-XPM1:BOT:READOUT'
# DCCM crystal 2
dccm_pitch2:
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-OP-DCCM:PITCH2'
dccm_energy2:
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-OP-DCCM:ENERGY2'
dccm_xbpm:
deviceClass: ophyd.EpicsSignalRO
deviceConfig:
prefix: 'X06DA-OP-XBPM1:SumAll:MeanValue_RBV'
# DCCM common
dccm_energy:
description: Monochromator energy using ECMC virtual motors
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-OP-DCCM:_ENERGY'
dccm_eoffset:
description: Monochromator energy offset for ECMC virtual motors
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-OP-DCCM:_EOFFSET'
# Secondary source XBPM
ssxbpm_trx:
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-ES-SSBPM1:TRX'
ssxbpm_try:
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-ES-SSBPM1:TRY'
ssslit_trxr:
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-ES-SSSH1:TRXR'
ssslit_trxw:
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-ES-SSSH1:TRXW'
ssslit_tryt:
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-ES-SSSV1:TRYT'
ssslit_tryb:
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-ES-SSSV1:TRYB'
ssxi1_trx:
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-ES-SSXI1:TRX'
ssxi1_try:
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-ES-SSXI1:TRY'
# Vertical focusing mirror
vfm_trxu:
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-ES1-VFM:TRXU'
enabled: false
vfm_trxd:
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-ES1-VFM:TRXD'
enabled: false
vfm_tryuw:
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-ES1-VFM:TRYUW'
vfm_tryr:
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-ES1-VFM:TRYR'
vfm_trydw:
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-ES1-VFM:TRYDW'
vfm_pitch:
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-ES1-VFM:PITCH'
vfm_yaw:
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-ES1-VFM:YAW'
enabled: false
vfm_roll:
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-ES1-VFM:ROLL'
enabled: false
vfm_trx:
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-ES1-VFM:TRX'
enabled: false
vfm_try:
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-ES1-VFM:TRY'
# Horizontal focusing mirror
hfm_trxu:
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-ES1-HFM:TRXU'
enabled: false
hfm_trxd:
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-ES1-HFM:TRXD'
enabled: false
hfm_tryur:
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-ES1-HFM:TRYUR'
hfm_tryw:
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-ES1-HFM:TRYW'
hfm_trydr:
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-ES1-HFM:TRYDR'
hfm_pitch:
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-ES1-HFM:PITCH'
enabled: false
hfm_yaw:
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-ES1-HFM:YAW'
enabled: false
hfm_roll:
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-ES1-HFM:ROLL'
enabled: false
hfm_trx:
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-ES1-HFM:TRX'
enabled: false
hfm_try:
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-ES1-HFM:TRY'
# Exposure box signals
xbox_diode:
deviceClass: ophyd.EpicsSignalRO
deviceConfig:
prefix: 'X06DA-ES-DI1:READOUT'
bstop_diode:
deviceClass: ophyd.EpicsSignalRO
deviceConfig:
prefix: 'X06DA-ES-BS:READOUT'

View File

@@ -1,5 +1,5 @@
slh_trxr:
description: OP slit inner blade motion
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-OP-SLH:TRXR'}
onFailure: buffer
@@ -8,6 +8,7 @@ slh_trxr:
readOnly: false
softwareTrigger: false
slh_trxw:
description: OP slit outer blade motion
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-OP-SLH:TRXW'}
onFailure: buffer
@@ -16,14 +17,16 @@ slh_trxw:
readOnly: false
softwareTrigger: false
fi1_try:
description: Beam attenuator motion before mono
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-OP-FI1:TRY'}
deviceConfig: {prefix: 'X06DA-OP-FI1:TRY1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
dccm_pitch1:
description: Monochromator pitch 1
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-OP-DCCM:PITCH1'}
onFailure: buffer
@@ -32,6 +35,7 @@ dccm_pitch1:
readOnly: false
softwareTrigger: false
dccm_energy1:
description: Monochromator energy 1
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-OP-DCCM:ENERGY1'}
onFailure: buffer
@@ -40,14 +44,16 @@ dccm_energy1:
readOnly: false
softwareTrigger: false
dccm_diode:
description: Diode between mono crystals
deviceClass: ophyd.EpicsSignalRO
deviceConfig: {read_pv: 'X06DA-OP-XPM1:BOT:READOUT'}
deviceConfig: {read_pv: 'X06DA-OP-XPM1:BOT:READOUT', auto_monitor: true}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: true
softwareTrigger: false
dccm_pitch2:
description: Monochromator pitch 2
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-OP-DCCM:PITCH2'}
onFailure: buffer
@@ -56,6 +62,7 @@ dccm_pitch2:
readOnly: false
softwareTrigger: false
dccm_energy2:
description: Monochromator energy 2
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-OP-DCCM:ENERGY2'}
onFailure: buffer
@@ -63,49 +70,62 @@ dccm_energy2:
readoutPriority: monitored
readOnly: false
softwareTrigger: false
dccm_energy:
description: Monochromator energy using ECMC virtual motors
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-OP-DCCM:_ENERGY'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
dccm_eoffset:
description: Monochromator energy offset between crystals using ECMC virtual motors
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-OP-DCCM:_EOFFSET'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
dccm_xbpm:
description: XBPM total intensity after monochromator
deviceClass: ophyd.EpicsSignalRO
deviceConfig: {read_pv: 'X06DA-OP-XBPM1:SumAll:MeanValue_RBV'}
deviceConfig: {read_pv: 'X06DA-OP-XBPM1:SumAll:MeanValue_RBV', auto_monitor: true}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: true
softwareTrigger: false
# dccm_energy:
# description: Monochromator energy using ECMC virtual motors
# deviceClass: ophyd.EpicsMotor
# deviceConfig: {prefix: 'X06DA-OP-DCCM:_ENERGY'}
# onFailure: buffer
# enabled: true
# readoutPriority: monitored
# readOnly: false
# softwareTrigger: false
# dccm_eoffset:
# description: Monochromator energy offset for ECMC virtual motors
# deviceClass: ophyd.EpicsMotor
# deviceConfig: {prefix: 'X06DA-OP-DCCM:_EOFFSET'}
# onFailure: buffer
# enabled: true
# readoutPriority: monitored
# readOnly: false
# softwareTrigger: false
ssxbpm_trx:
description: XBPM motion before secondary source
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-SSBPM1:TRX'}
deviceConfig: {prefix: 'X06DA-ES-SSBPM1:TRX1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
ssxbpm_try:
description: XBPM motion before secondary source
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-SSBPM1:TRY'}
deviceConfig: {prefix: 'X06DA-ES-SSBPM1:TRY1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
# ssxbpm:
# description: XBPM before secondary source
# deviceClass: ophyd.EpicsSignalRO
# deviceConfig: {read_pv: 'X06DA-ES-SSBPM1:SumAll:MeanValue_RBV'}
# onFailure: buffer
# enabled: true
# readoutPriority: monitored
# readOnly: true
# softwareTrigger: false
ssslit_trxr:
description: Secondary source blade motion
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-SSSH1:TRXR'}
onFailure: buffer
@@ -114,6 +134,7 @@ ssslit_trxr:
readOnly: false
softwareTrigger: false
ssslit_trxw:
description: Secondary source blade motion
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-SSSH1:TRXW'}
onFailure: buffer
@@ -122,6 +143,7 @@ ssslit_trxw:
readOnly: false
softwareTrigger: false
ssslit_tryt:
description: Secondary source blade motion
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-SSSV1:TRYT'}
onFailure: buffer
@@ -130,6 +152,7 @@ ssslit_tryt:
readOnly: false
softwareTrigger: false
ssslit_tryb:
description: Secondary source blade motion
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-SSSV1:TRYB'}
onFailure: buffer
@@ -138,16 +161,18 @@ ssslit_tryb:
readOnly: false
softwareTrigger: false
ssxi1_trx:
description: Secondary source diagnostic screen motion
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-SSXI1:TRX'}
deviceConfig: {prefix: 'X06DA-ES-SSXI1:TRX1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
ssxi1_try:
description: Secondary source diagnostic screen motion
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-SSXI1:TRY'}
deviceConfig: {prefix: 'X06DA-ES-SSXI1:TRY1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
@@ -155,7 +180,7 @@ ssxi1_try:
softwareTrigger: false
vfm_trxu:
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES1-VFM:TRXU'}
deviceConfig: {prefix: 'X06DA-ES-VFM:TRXU'}
enabled: false
onFailure: buffer
readoutPriority: monitored
@@ -163,39 +188,40 @@ vfm_trxu:
softwareTrigger: false
vfm_trxd:
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES1-VFM:TRXD'}
deviceConfig: {prefix: 'X06DA-ES-VFM:TRXD'}
enabled: false
onFailure: buffer
readoutPriority: monitored
readOnly: false
softwareTrigger: false
vfm_tryuw:
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES1-VFM:TRYUW'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
vfm_tryr:
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES1-VFM:TRYR'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
vfm_trydw:
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES1-VFM:TRYDW'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
# vfm_tryuw:
# deviceClass: ophyd.EpicsMotor
# deviceConfig: {prefix: 'X06DA-ES-VFM:TRYUW'}
# onFailure: buffer
# enabled: true
# readoutPriority: monitored
# readOnly: false
# softwareTrigger: false
# vfm_tryr:
# deviceClass: ophyd.EpicsMotor
# deviceConfig: {prefix: 'X06DA-ES-VFM:TRYR'}
# onFailure: buffer
# enabled: true
# readoutPriority: monitored
# readOnly: false
# softwareTrigger: false
# vfm_trydw:
# deviceClass: ophyd.EpicsMotor
# deviceConfig: {prefix: 'X06DA-ES-VFM:TRYDW'}
# onFailure: buffer
# enabled: true
# readoutPriority: monitored
# readOnly: false
# softwareTrigger: false
vfm_pitch:
description: KB mirror vertical steering
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES1-VFM:PITCH'}
deviceConfig: {prefix: 'X06DA-ES-VFM:PITCH'}
onFailure: buffer
enabled: true
readoutPriority: monitored
@@ -203,7 +229,7 @@ vfm_pitch:
softwareTrigger: false
vfm_yaw:
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES1-VFM:YAW'}
deviceConfig: {prefix: 'X06DA-ES-VFM:YAW'}
enabled: false
onFailure: buffer
readoutPriority: monitored
@@ -211,7 +237,7 @@ vfm_yaw:
softwareTrigger: false
vfm_roll:
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES1-VFM:ROLL'}
deviceConfig: {prefix: 'X06DA-ES-VFM:ROLL'}
enabled: false
onFailure: buffer
readoutPriority: monitored
@@ -219,7 +245,7 @@ vfm_roll:
softwareTrigger: false
vfm_trx:
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES1-VFM:TRX'}
deviceConfig: {prefix: 'X06DA-ES-VFM:TRX'}
enabled: false
onFailure: buffer
readoutPriority: monitored
@@ -227,7 +253,7 @@ vfm_trx:
softwareTrigger: false
vfm_try:
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES1-VFM:TRY'}
deviceConfig: {prefix: 'X06DA-ES-VFM:TRY'}
onFailure: buffer
enabled: true
readoutPriority: monitored
@@ -235,7 +261,7 @@ vfm_try:
softwareTrigger: false
hfm_trxu:
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES1-HFM:TRXU'}
deviceConfig: {prefix: 'X06DA-ES-HFM:TRXU'}
enabled: false
onFailure: buffer
readoutPriority: monitored
@@ -243,39 +269,40 @@ hfm_trxu:
softwareTrigger: false
hfm_trxd:
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES1-HFM:TRXD'}
deviceConfig: {prefix: 'X06DA-ES-HFM:TRXD'}
enabled: false
onFailure: buffer
readoutPriority: monitored
readOnly: false
softwareTrigger: false
hfm_tryur:
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES1-HFM:TRYUR'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
hfm_tryw:
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES1-HFM:TRYW'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
hfm_trydr:
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES1-HFM:TRYDR'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
# hfm_tryur:
# deviceClass: ophyd.EpicsMotor
# deviceConfig: {prefix: 'X06DA-ES-HFM:TRYUR'}
# onFailure: buffer
# enabled: true
# readoutPriority: monitored
# readOnly: false
# softwareTrigger: false
# hfm_tryw:
# deviceClass: ophyd.EpicsMotor
# deviceConfig: {prefix: 'X06DA-ES-HFM:TRYW'}
# onFailure: buffer
# enabled: true
# readoutPriority: monitored
# readOnly: false
# softwareTrigger: false
# hfm_trydr:
# deviceClass: ophyd.EpicsMotor
# deviceConfig: {prefix: 'X06DA-ES-HFM:TRYDR'}
# onFailure: buffer
# enabled: true
# readoutPriority: monitored
# readOnly: false
# softwareTrigger: false
hfm_pitch:
description: KB mirror horizontal steering
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES1-HFM:PITCH'}
deviceConfig: {prefix: 'X06DA-ES-HFM:PITCH'}
enabled: false
onFailure: buffer
readoutPriority: monitored
@@ -283,7 +310,7 @@ hfm_pitch:
softwareTrigger: false
hfm_yaw:
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES1-HFM:YAW'}
deviceConfig: {prefix: 'X06DA-ES-HFM:YAW'}
enabled: false
onFailure: buffer
readoutPriority: monitored
@@ -291,7 +318,7 @@ hfm_yaw:
softwareTrigger: false
hfm_roll:
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES1-HFM:ROLL'}
deviceConfig: {prefix: 'X06DA-ES-HFM:ROLL'}
enabled: false
onFailure: buffer
readoutPriority: monitored
@@ -299,7 +326,7 @@ hfm_roll:
softwareTrigger: false
hfm_trx:
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES1-HFM:TRX'}
deviceConfig: {prefix: 'X06DA-ES-HFM:TRX'}
enabled: false
onFailure: buffer
readoutPriority: monitored
@@ -307,13 +334,59 @@ hfm_trx:
softwareTrigger: false
hfm_try:
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES1-HFM:TRY'}
deviceConfig: {prefix: 'X06DA-ES-HFM:TRY'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
# xbox_xbpm:
# description: Exposure box XBPM
# deviceClass: ophyd.EpicsSignalRO
# deviceConfig: {read_pv: 'X06DA-ES-XBBPM1:SumAll:MeanValue_RBV'}
# onFailure: buffer
# enabled: true
# readoutPriority: monitored
# readOnly: true
# softwareTrigger: false
xbox_fil1:
description: Exposure box filter wheel 1
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-FI1:ROZ1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
xbox_fil2:
description: Exposure box filter wheel 2
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-FI2:ROZ1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
xbox_fil3:
description: Exposure box filter wheel 3
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-FI3:ROZ1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
xbox_fil4:
description: Exposure box filter wheel 4
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-FI4:ROZ1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
xbox_diode:
description: Exposure box diode
deviceClass: ophyd.EpicsSignalRO
deviceConfig: {read_pv: 'X06DA-ES-DI1:READOUT'}
onFailure: buffer
@@ -321,11 +394,230 @@ xbox_diode:
readoutPriority: monitored
readOnly: true
softwareTrigger: false
bstop_diode:
samdist:
description: Sample distance
deviceClass: ophyd.EpicsSignalRO
deviceConfig: {read_pv: 'X06DA-ES-BS:READOUT'}
deviceConfig: {read_pv: 'X06DA-ES-DF1:CBOX-USER1', auto_monitor: true}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: true
softwareTrigger: false
samrange:
description: Sample in valid distance
deviceClass: ophyd.EpicsSignalRO
deviceConfig: {read_pv: 'X06DA-ES-DF1:CBOX-CMP1', auto_monitor: true}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: true
softwareTrigger: false
samzoom:
description: Sample microscope zoom
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-SAMCAM:ZOOM'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
samcam:
description: Sample camera device
deviceClass: ophyd_devices.devices.areadetector.cam.GenICam
deviceConfig: {prefix: 'X06DA-SAMCAM:cam1:'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
bstop_pneum:
description: Beamstop pneumatic in-out
deviceClass: ophyd.EpicsSignal
deviceConfig: {read_pv: 'X06DA-ES-BS:GET-POS', write_pv: 'X06DA-ES-BS:SET-POS'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
bstop_x:
description: Beamstop translation
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-BS:TRX1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
bstop_y:
description: Beamstop translation
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-BS:TRY1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
bstop_z:
description: Beamstop translation
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-BS:TRZ1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
bstop_pneum:
description: Beamstop pneumatic
deviceClass: ophyd.EpicsSignal
deviceConfig: {read_pv: 'X06DA-ES-BS:GET-POS', write_pv: 'X06DA-ES-BS:SET-POS', put_complete: true}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
bstop_diode:
description: Beamstop diode
deviceClass: ophyd.EpicsSignalRO
deviceConfig: {read_pv: 'X06DA-ES-BS:READOUT', auto_monitor: true}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: true
softwareTrigger: false
frontlight:
description: Microscope frontlight
deviceClass: ophyd.EpicsSignal
deviceConfig: {read_pv: 'X06DA-ES-FL:SET-BRGHT', put_complete: true}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
backlight:
description: Backlight reflector
deviceClass: ophyd.EpicsSignal
deviceConfig: {read_pv: 'X06DA-ES-BL:GET-POS', write_pv: 'X06DA-ES-BL:SET-POS', put_complete: true}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
det_y:
description: Pilatus height
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-DET:TRY1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
det_z:
description: Pilatus translation
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-DET:TRZ1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
gmx:
description: ABR horizontal stage
deviceClass: pxiii_bec.devices.A3200Axis
deviceConfig: {prefix: 'X06DA-ES-DF1:GMX', base_pv: 'X06DA-ES'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
gmy:
description: ABR vertical stage
deviceClass: pxiii_bec.devices.A3200Axis
deviceConfig: {prefix: 'X06DA-ES-DF1:GMY', base_pv: 'X06DA-ES'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
omega:
description: ABR rotation stage
deviceClass: pxiii_bec.devices.A3200Axis
deviceConfig: {prefix: 'X06DA-ES-DF1:OMEGA', base_pv: 'X06DA-ES'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
abr:
description: Aerotech ABR motion system
deviceClass: pxiii_bec.devices.AerotechAbrStage
deviceConfig: {prefix: 'X06DA-ES'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
shx:
description: SmarGon X axis
deviceClass: pxiii_bec.devices.SmarGonAxis
deviceConfig: {prefix: 'SCS', sg_url: 'http://x06da-smargopolo.psi.ch:3000'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
shy:
description: SmarGon Y axis
deviceClass: pxiii_bec.devices.SmarGonAxis
deviceConfig: {prefix: 'SCS', sg_url: 'http://x06da-smargopolo.psi.ch:3000'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
shz:
description: SmarGon Z axis
deviceClass: pxiii_bec.devices.SmarGonAxis
deviceConfig: {prefix: 'SCS', low_limit: 10, high_limit: 22, sg_url: 'http://x06da-smargopolo.psi.ch:3000'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
chi:
description: SmarGon CHI axis
deviceClass: pxiii_bec.devices.SmarGonAxis
deviceConfig: {prefix: 'SCS', sg_url: 'http://x06da-smargopolo.psi.ch:3000'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
phi:
description: SmarGon PHI axis
deviceClass: pxiii_bec.devices.SmarGonAxis
deviceConfig: {prefix: 'SCS', sg_url: 'http://x06da-smargopolo.psi.ch:3000'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
samimg:
description: Sample camera image
deviceClass: ophyd_devices.devices.areadetector.plugins.ImagePlugin_V35
deviceConfig: {prefix: 'X06DA-SAMCAM:image1:'}
onFailure: buffer
enabled: false
readoutPriority: monitored
readOnly: true
softwareTrigger: false

502
pxiii_bec/devices/A3200.py Normal file
View File

@@ -0,0 +1,502 @@
"""
``Aerotech`` --- Aerotech control software
******************************************
This module provides an object to control the Aerotech Abr rotational stage.
Methods in the Abr class
========================
Standard bluesky interface:
AerotechAbrStage.configure(d={...})
AerotechAbrStage.kickoff()
AerotechAbrStage.stop()
Additional bluesky functionality:
Aerotech.is_homed()
Aerotech.do_homing(wait=True)
Aerotech.get_ready(ostart=None, orange=None, etime=None, wait=True)
Aerotech.is_done()
Aerotech.is_ready()
Aerotech.is_busy()
Aerotech.start_exposure()
Aerotech.wait_status(status)
Aerotech.move(angle, wait=False, speed=None)
Aerotech.set_shutter(state)
Attribute Access in Abr class
=============================
The Abr class overwrites the getattr and setattr methods to provide a pythonic
way of controlling the rootation stage of the Abr.
The following properties are implemented:
velocity
Sets the velocity of rotation: ``-ES-DF1:ROTX-SETV``
omega
Move the omega angle without any wait: ``-ES-DF1:ROTX-VAL``
exp_time
Sets the PV for the measurement's exposure time: ``-ES-OSC:ETIME``
start_angle
Sets the PV for the measurement's starting angle: ``-ES-OSC:START-POS``
oscillation_angle
Sets the PV for the measurement's oscillation angle: ``-ES-OSC:RANGE``
shutter
Controls the shutter: ``-ES-PH1:SET`` and ``-ES-PH1:GET``
measurement_state
Returns the PV for the measurement state: ``-ES-OSC:DONE``
axis_mode
Returns the axis mode: ``-ES-DF1:AXES-MODE``
Examples
========
abr = AerotechAbrStage(prefix="X06DA-ES-DF1", name="abr")
# move omega to 270.0 degrees
abr.omega = 270.0
# move omega to 180 degrees and wait for movement to finish
abr.move(180, wait=True)
# move omega to 3000 degrees at 360 degrees/s and wait for movement to finish
abr.move(3000, velocity=360, wait=True)
# stop any movement
abr.stop() # this function only returns after the STATUS is back to OK
"""
import time
from ophyd import Component, EpicsSignal, EpicsSignalRO, Kind
from ophyd.status import SubscriptionStatus
from ophyd_devices.interfaces.base_classes.psi_detector_base import PSIDetectorBase as PsiDeviceBase
from ophyd_devices.interfaces.base_classes.psi_detector_base import (
CustomDetectorMixin as CustomDeviceMixin,
)
try:
from .A3200enums import AbrCmd, AbrMode
except ImportError:
from A3200enums import AbrCmd, AbrMode
try:
from bec_lib import bec_logger
logger = bec_logger.logger
except ModuleNotFoundError:
import logging
logger = logging.getLogger("A3200")
# pylint: disable=logging-fstring-interpolation
class AerotechAbrMixin(CustomDeviceMixin):
"""Configuration class for the Aerotech A3200 controller for the ABR stage"""
def on_stage(self):
"""
NOTE: Zac's request is that stage is essentially ARM, i.e. get ready and don't do anything.
"""
logger.warning(f"Configuring {self.parent.scaninfo.scan_msg.info['scan_name']} on ABR")
d = {}
if self.parent.scaninfo.scan_type in ("measure", "measurement", "fly"):
scanargs = self.parent.scaninfo.scan_msg.info["kwargs"]
scanname = self.parent.scaninfo.scan_msg.info["scan_name"]
if scanname in ("standardscan"):
scan_start = scanargs["start"]
scan_range = scanargs["range"]
scan_move_time = scanargs["move_time"]
scan_ready_rate = scanargs.get("ready_rate", 500)
d["scan_command"] = AbrCmd.MEASURE_STANDARD
d["var_1"] = scan_start
d["var_2"] = scan_range
d["var_3"] = scan_move_time
d["var_4"] = scan_ready_rate
d["var_5"] = 0
d["var_6"] = 0
d["var_7"] = 0
d["var_8"] = 0
d["var_9"] = 0
if scanname in ("verticallinescan", "vlinescan"):
scan_exp_time = scanargs["exp_time"]
scan_range_y = scanargs["range"]
scan_steps_y = scanargs["steps"]
d["scan_command"] = AbrCmd.VERTICAL_LINE_SCAN
d["var_1"] = scan_range_y / scan_steps_y
d["var_2"] = scan_steps_y
d["var_3"] = scan_exp_time
d["var_4"] = 0
d["var_5"] = 0
d["var_6"] = 0
d["var_7"] = 0
d["var_8"] = 0
d["var_9"] = 0
if scanname in ("screeningscan"):
scan_start = scanargs["start"]
scan_range = scanargs["range"]
scan_stepnum_o = scanargs["steps"]
scan_exp_time = scanargs["exp_time"]
scan_oscrange = scanargs["oscrange"]
scan_delta = scanargs.get("delta", 0.5)
scan_stepsize_o = scan_range / scan_stepnum_o
d["scan_command"] = AbrCmd.SCREENING
d["var_1"] = scan_start
d["var_2"] = scan_oscrange
d["var_3"] = scan_exp_time
d["var_4"] = scan_stepsize_o
d["var_5"] = scan_stepnum_o
d["var_6"] = scan_delta
d["var_7"] = 0
d["var_8"] = 0
d["var_9"] = 0
if scanname in ("rasterscan", "rastersimplescan"):
scan_exp_time = scanargs["exp_time"]
scan_range_x = scanargs["range_x"]
scan_range_y = scanargs["range_y"]
scan_stepnum_x = scanargs["steps_x"]
scan_stepnum_y = scanargs["steps_y"]
scan_stepsize_x = scan_range_x / scan_stepnum_x
scan_stepsize_y = scan_range_y / scan_stepnum_y
d["scan_command"] = AbrCmd.RASTER_SCAN_SIMPLE
d["var_1"] = scan_exp_time
d["var_2"] = scan_stepsize_x
d["var_3"] = scan_stepsize_y
d["var_4"] = scan_stepnum_x
d["var_5"] = scan_stepnum_y
d["var_6"] = 0
d["var_7"] = 0
d["var_8"] = 0
d["var_9"] = 0
# Reconfigure if got a valid scan config
if len(d) > 0:
self.parent.configure(d)
# Stage the parent
self.parent.bluestage()
def on_kickoff(self):
"""Kick off parent"""
self.parent.bluekickoff()
def on_unstage(self):
"""Unstage the ABR controller"""
self.parent.blueunstage()
class AerotechAbrStage(PsiDeviceBase):
"""Standard PX stage on A3200 controller
This is the wrapper class for the standard rotation stage layout for the PX
beamlines at SLS. It wraps the main rotation axis OMEGA (Aerotech ABR)and
the associated motion axes GMX, GMY and GMZ. The ophyd class associates to
the general PX measurement procedure, which is that the actual scan script
is running as an AeroBasic program on the controller and we communicate to
it via 10+1 global variables.
"""
custom_prepare_cls = AerotechAbrMixin
USER_ACCESS = ["reset", "kickoff", "complete"]
taskStop = Component(EpicsSignal, "-AERO:TSK-STOP", put_complete=True, kind=Kind.omitted)
status = Component(EpicsSignal, "-AERO:STAT", put_complete=True, kind=Kind.omitted)
clear = Component(EpicsSignal, "-AERO:CTRL-CLFT", put_complete=True, kind=Kind.omitted)
# Enable/disable motor movement via the IOC (i.e. make it task-only)
axisModeLocked = Component(EpicsSignal, "-DF1:LOCK", put_complete=True, kind=Kind.omitted)
axisModeDirect = Component(
EpicsSignal, "-DF1:MODE-DIRECT", put_complete=True, kind=Kind.omitted
)
axisAxesMode = Component(EpicsSignal, "-DF1:AXES-MODE", put_complete=True, kind=Kind.omitted)
# Shutter box is missing readback so the -GET signal is installed on the VME
# _shutter = Component(
# EpicsSignal, "-PH1:GET", write_pv="-PH1:SET", put_complete=True, kind=Kind.config,
# )
# Status flags for all axes
omega_done = Component(EpicsSignalRO, "-DF1:OMEGA-DONE", kind=Kind.normal)
gmx_done = Component(EpicsSignalRO, "-DF1:GMX-DONE", kind=Kind.normal)
gmy_done = Component(EpicsSignalRO, "-DF1:GMY-DONE", kind=Kind.normal)
gmz_done = Component(EpicsSignalRO, "-DF1:GMZ-DONE", kind=Kind.normal)
# For some reason the task interface is called PSO...
scan_command = Component(EpicsSignal, "-PSO:CMD", put_complete=True, kind=Kind.omitted)
start_command = Component(
EpicsSignal, "-PSO:START-TEST.PROC", put_complete=True, kind=Kind.omitted
)
stop_command = Component(
EpicsSignal, "-PSO:STOP-TEST.PROC", put_complete=True, kind=Kind.omitted
)
# Global variables to controll AeroBasic scripts
_var_1 = Component(EpicsSignal, "-PSO:VAR-1", put_complete=True, kind=Kind.omitted)
_var_2 = Component(EpicsSignal, "-PSO:VAR-2", put_complete=True, kind=Kind.omitted)
_var_3 = Component(EpicsSignal, "-PSO:VAR-3", put_complete=True, kind=Kind.omitted)
_var_4 = Component(EpicsSignal, "-PSO:VAR-4", put_complete=True, kind=Kind.omitted)
_var_5 = Component(EpicsSignal, "-PSO:VAR-5", put_complete=True, kind=Kind.omitted)
_var_6 = Component(EpicsSignal, "-PSO:VAR-6", put_complete=True, kind=Kind.omitted)
_var_7 = Component(EpicsSignal, "-PSO:VAR-7", put_complete=True, kind=Kind.omitted)
_var_8 = Component(EpicsSignal, "-PSO:VAR-8", put_complete=True, kind=Kind.omitted)
_var_9 = Component(EpicsSignal, "-PSO:VAR-9", put_complete=True, kind=Kind.omitted)
_var_10 = Component(EpicsSignal, "-PSO:VAR-10", put_complete=True, kind=Kind.omitted)
# Task status PVs (programs always run on task 1)
task1 = Component(EpicsSignalRO, "-AERO:TSK1-DONE", auto_monitor=True)
task2 = Component(EpicsSignalRO, "-AERO:TSK2-DONE", auto_monitor=True)
task3 = Component(EpicsSignalRO, "-AERO:TSK3-DONE", auto_monitor=True)
task4 = Component(EpicsSignalRO, "-AERO:TSK4-DONE", auto_monitor=True)
# A few PVs still needed from grid
raster_scan_done = Component(EpicsSignal, "-GRD:SCAN-DONE", kind=Kind.config)
raster_num_rows = Component(EpicsSignal, "-GRD:ROW-DONE", kind=Kind.config)
def set_axis_mode(self, mode: str, settle_time=0.1) -> None:
"""Set axis mode to direct/measurement mode.
Measurement ensures that the scrips run undisturbed by blocking axis
motion commands from the IOC (i.e. internal only).
Parameters:
-----------
mode : str
Valid values are 'direct' and 'measuring'.
"""
if mode == "direct":
self.axisModeDirect.set(37, settle_time=settle_time).wait()
if mode == "measuring":
self.axisAxesMode.set(AbrMode.MEASURING, settle_time=settle_time).wait()
def configure(self, d: dict) -> tuple:
""" " Configure the exposure scripts
Script execution at the PX beamlines is based on scripts that are always
running on the controller that execute commands when commanded by
setting a pre-defined set of global variables. This method performs the
configuration of the exposure scrips by setting the required global
variables.
Parameters in d: dict
---------------------
scan_command: int
The index of the desired AeroBasic program to be executed.
Usually supported values are taken from an Enum.
var_1:
var_2:
var_3:
var_4:
var_5:
var_6:
var_7:
var_8:
var_9:
var_10:
"""
old = self.read_configuration()
# ToDo: Check if idle before reconfiguring
self.scan_command.set(d["scan_command"]).wait()
# Set the corresponding global variables
if "var_1" in d and d["var_1"] is not None:
self._var_1.set(d["var_1"]).wait()
if "var_2" in d and d["var_2"] is not None:
self._var_2.set(d["var_2"]).wait()
if "var_3" in d and d["var_3"] is not None:
self._var_3.set(d["var_3"]).wait()
if "var_4" in d and d["var_4"] is not None:
self._var_4.set(d["var_4"]).wait()
if "var_5" in d and d["var_5"] is not None:
self._var_5.set(d["var_5"]).wait()
if "var_6" in d and d["var_6"] is not None:
self._var_6.set(d["var_6"]).wait()
if "var_7" in d and d["var_7"] is not None:
self._var_7.set(d["var_7"]).wait()
if "var_8" in d and d["var_8"] is not None:
self._var_8.set(d["var_8"]).wait()
if "var_9" in d and d["var_9"] is not None:
self._var_9.set(d["var_9"]).wait()
if "var_10" in d and d["var_10"] is not None:
self._var_10.set(d["var_10"]).wait()
new = self.read_configuration()
return old, new
def bluestage(self):
"""Bluesky-style stage
Since configuration synchronization is not guaranteed, this does
nothing. The script launched by kickoff().
"""
pass
def bluekickoff(self, timeout=1) -> SubscriptionStatus:
"""Kick off the set program"""
self.start_command.set(1).wait()
# Define wait until the busy flag goes high
def is_busy(*, value, **_):
return bool(value == 0)
# Subscribe and wait for update
status = SubscriptionStatus(
self.raster_scan_done, is_busy, timeout=timeout, settle_time=0.1
)
return status
def blueunstage(self, settle_time=0.1):
"""Stops current script and releases the axes"""
# Disarm commands
self.scan_command.set(AbrCmd.NONE, settle_time=settle_time).wait()
self.stop_command.set(1).wait()
# Go to direct mode
self.set_axis_mode("direct", settle_time=settle_time)
def complete(self, timeout=None) -> SubscriptionStatus:
"""Waits for task execution
NOTE: Original complete was raster scanner complete...
"""
# Define wait until the busy flag goes down (excluding initial update)
def is_idle(*, value, **_):
return bool(value == 1)
# Subscribe and wait for update
# status = SubscriptionStatus(self.task1, is_idle, timeout=timeout, settle_time=0.5)
status = SubscriptionStatus(
self.raster_scan_done, is_idle, timeout=timeout, settle_time=0.5
)
return status
def reset(self, settle_time=0.1, wait_after_reload=1) -> None:
"""Resets the Aerotech controller state
Attempts to reset the currently running measurement task on the A3200
by stopping current motions, reloading aerobasic programs and going
to DIRECT mode.
This will stop movements in both DIRECT and MEASURING modes. During the
stop the `status` temporarely goes to ERROR but reverts to OK after a
couple of seconds.
"""
# Disarm commands
self.scan_command.set(AbrCmd.NONE, settle_time=settle_time).wait()
self.stop_command.set(1, settle_time=settle_time)
# Reload tasks
self.taskStop.set(1, settle_time=wait_after_reload).wait()
# Go to direct mode
self.set_axis_mode("direct", settle_time=settle_time)
def stop(self, settle_time=1.0) -> None:
"""Stops current motions"""
# Disarm commands
self.scan_command.set(AbrCmd.NONE, settle_time=settle_time).wait()
self.stop_command.set(1).wait()
# Go to direct mode
self.set_axis_mode("direct", settle_time=settle_time)
def is_ioc_ok(self):
"""Checks execution status"""
return 0 == self.status.get()
# @property
# def exp_time(self):
# return self.osc.exp_time.get()
# @exp_time.setter
# def exp_time(self, value):
# self.osc.etime.set(value).wait()
# @property
# def start_angle(self):
# return self.osc.ostart_pos.get()
# @start_angle.setter
# def start_angle(self, value):
# self.osc.ostart_pos(value).wait()
# @property
# def measurement_state(self):
# return self.osc.phase.get()
# @measurement_state.setter
# def measurement_state(self, value):
# self.osc.phase.set(value).wait()
@property
def axis_mode(self):
return self.axisAxesMode.get()
# @property
# def shutter(self):
# return self._shutter.get()
# @shutter.setter
# def shutter(self, value):
# if self.axisAxesMode.get():
# print("ABR is not in direct mode; cannot manipulate shutter")
# return False
# state = str(state).lower()
# if state not in ["1", "0", "closed", "open"]:
# print("unknown shutter state requested")
# return None
# elif state in ["1", "open"]:
# state = 1
# elif state == ["0", "closed"]:
# state = 0
# self._shutter.set(state).wait()
# return state == self._shutter.get()
def wait_for_movements(self, timeout=60.0):
"""Waits for all motor movements"""
t_start = time.time()
t_elapsed = 0
while self.is_moving() and t_elapsed < timeout:
t_elapsed = time.time() - t_start
if timeout is not None and t_elapsed > timeout:
raise TimeoutError("Timeout waiting for all axis to stop moving")
time.sleep(0.5)
def is_moving(self):
"""Chechs if all axes are DONE"""
return not (
self.omega_done.get()
and self.gmx_done.get()
and self.gmy_done.get()
and self.gmz_done.get()
)
# def start_exposure(self):
# """Starts the previously configured exposure."""
# self.wait_for_movements()
# self.osc.taskStart.set(1).wait()
# for _ in range(10):
# try:
# self.osc.wait_status(ABR_BUSY, timeout=1)
# except RuntimeWarning as ex:
# logger.error(f"{ex} --- trying start again.")
# self.osc.kickoff()
if __name__ == "__main__":
abr = AerotechAbrStage(prefix="X06DA-ES", name="abr")
abr.wait_for_connection()

View File

@@ -0,0 +1,134 @@
# -*- coding: utf-8 -*-
"""
Enumerations for the MX specific Aerotech A3200 stage.
@author: mohacsi_i
"""
# pylint: disable=too-few-public-methods
class AbrStatus:
"""ABR measurement task status"""
DONE = 0
READY = 1
BUSY = 2
ABR_DONE = 0
ABR_READY = 1
ABR_BUSY = 2
class AbrGridStatus:
"""ABR grid scan status"""
BUSY = 0
DONE = 1
GRID_SCAN_BUSY = 0
GRID_SCAN_DONE = 1
class AbrMode:
"""ABR mode status"""
DIRECT = 0
MEASURING = 1
DIRECT_MODE = 0
MEASURING_MODE = 1
class AbrShutterStatus:
"""ABR shutter status"""
CLOSE = 0
OPEN = 1
SHUTTER_CLOSE = 0
SHUTTER_OPEN = 1
class AbrGridPeriod:
"""ABR grid period"""
FULL = 0
HALF = 1
FULL_PERIOD = 0
HALF_PERIOD = 1
class AbrCmd:
"""ABR command table"""
NONE = 0
RASTER_SCAN_SIMPLE = 1
MEASURE_STANDARD = 2
VERTICAL_LINE_SCAN = 3
SCREENING = 4
SUPER_FAST_OMEGA = 5
STILL_WEDGE = 6
STILLS = 7
REPEAT_SINGLE_OSCILLATION = 8
SINGLE_OSCILLATION = 9
OLD_FASHIONED = 10
RASTER_SCAN = 11
JET_ROTATION = 12
X_HELICAL = 13
X_RUNSEQ = 14
JUNGFRAU = 15
MSOX = 16
SLIT_SCAN = 17
RASTER_SCAN_STILL = 18
SCAN_SASTT = 19
SCAN_SASTT_V2 = 20
SCAN_SASTT_V3 = 21
CMD_NONE = 0
CMD_RASTER_SCAN_SIMPLE = 1
CMD_MEASURE_STANDARD = 2
CMD_VERTICAL_LINE_SCAN = 3
CMD_SCREENING = 4
CMD_SUPER_FAST_OMEGA = 5
CMD_STILL_WEDGE = 6
CMD_STILLS = 7
CMD_REPEAT_SINGLE_OSCILLATION = 8
CMD_SINGLE_OSCILLATION = 9
CMD_OLD_FASHIONED = 10
CMD_RASTER_SCAN = 11
CMD_JET_ROTATION = 12
CMD_X_HELICAL = 13
CMD_X_RUNSEQ = 14
CMD_JUNGFRAU = 15
CMD_MSOX = 16
CMD_SLIT_SCAN = 17
CMD_RASTER_SCAN_STILL = 18
CMD_SCAN_SASTT = 19
CMD_SCAN_SASTT_V2 = 20
CMD_SCAN_SASTT_V3 = 21
class AbrAxis:
"""ABR axis index"""
OMEGA = 1
GMX = 2
GMY = 3
GMZ = 4
STY = 5
STZ = 6
AXIS_OMEGA = 1
AXIS_GMX = 2
AXIS_GMY = 3
AXIS_GMZ = 4
AXIS_STY = 5
AXIS_STZ = 6

View File

@@ -0,0 +1,250 @@
# -*- coding: utf-8 -*-
"""
Created on Tue Jun 11 11:28:38 2024
@author: mohacsi_i
"""
import types
from ophyd import Component, PVPositioner, Signal, EpicsSignal, EpicsSignalRO, Kind, PositionerBase
from ophyd.status import Status, MoveStatus
from .A3200enums import AbrMode
from bec_lib import bec_logger
logger = bec_logger.logger
# ABR_DONE = 0
# ABR_READY = 1
# ABR_BUSY = 2
# GRID_SCAN_BUSY = 0
# GRID_SCAN_DONE = 1
# DIRECT_MODE = 0
# MEASURING_MODE = 1
class A3200Axis(PVPositioner):
"""Positioner wrapper for A3200 axes
Positioner wrapper for motors on the Aerotech A3200 controller. As the IOC
does not provide a motor record, this class simply wraps axes into a
standard Ophyd positioner for the BEC. It also has some additional
functionality for error checking and diagnostics.
Examples
--------
omega = A3200Axis('X06DA-ES-DF1:OMEGA', base_pv='X06DA-ES')
class abr(Device):
omega = Component(A3200Axis, '-DF1:OMEGA')
gmx = Component(A3200Axis, '-DF1:GMX')
gmy = Component(A3200Axis, '-DF1:GMY')
Parameters
----------
prefix : str
Axis PV name root.
base_pv : str (situational)
IOC PV name root, i.e. X06DA-ES if standalone class.
"""
USER_ACCESS = ["omove"]
abr_mode_direct = Component(
EpicsSignal, "-DF1:MODE-DIRECT", put_complete=True, kind=Kind.omitted
)
abr_mode = Component(
EpicsSignal, "-DF1:AXES-MODE", auto_monitor=True, put_complete=True, kind=Kind.omitted
)
# Basic PV positioner interface
done = Component(EpicsSignalRO, "-DONE", auto_monitor=True, kind=Kind.config)
readback = Component(EpicsSignalRO, "-RBV", auto_monitor=True, kind=Kind.hinted)
# Setpoint is one of the two...
setpoint = Component(EpicsSignal, "-SETP", kind=Kind.config)
# setpoint = Component(EpicsSignal, "-VAL", kind=Kind.config)
velocity = Component(EpicsSignal, "-SETV", kind=Kind.config)
status = Component(EpicsSignalRO, "-STAT", auto_monitor=True, kind=Kind.config)
# PV to issue native relative movements on the A3200
relmove = Component(EpicsSignal, "-INCP", put_complete=True, kind=Kind.config)
# PV to home axis
home = Component(EpicsSignal, "-HOME", kind=Kind.config)
ishomed = Component(EpicsSignal, "-AS00", kind=Kind.config)
# HW status words
dshw = Component(EpicsSignalRO, "-DSHW", auto_monitor=True, kind=Kind.normal)
ashw = Component(EpicsSignalRO, "-ASHW", auto_monitor=True, kind=Kind.normal)
fslw = Component(EpicsSignalRO, "-FSLW", auto_monitor=True, kind=Kind.normal)
# Rock movement
_rock = Component(EpicsSignal, "-ROCK", put_complete=True, kind=Kind.config)
_rock_dist = Component(EpicsSignal, "-RINCP", put_complete=True, kind=Kind.config)
_rock_velo = Component(EpicsSignal, "-RSETV", put_complete=True, kind=Kind.config)
_rock_count = Component(EpicsSignal, "-COUNT", put_complete=True, kind=Kind.config)
# _rock_accel = Component(EpicsSignal, "-RRATE", put_complete=True, kind=Kind.config)
hlm = Component(Signal, kind=Kind.config)
llm = Component(Signal, kind=Kind.config)
vmin = Component(Signal, kind=Kind.config)
vmax = Component(Signal, kind=Kind.config)
offset = Component(EpicsSignal, "-OFF", put_complete=True, kind=Kind.config)
def __init__(
self,
prefix="",
*,
name,
base_pv="",
kind=None,
read_attrs=None,
configuration_attrs=None,
parent=None,
llm=0,
hlm=0,
vmin=0,
vmax=0,
**kwargs,
):
"""__init__ MUST have a full argument list"""
# Patching the parent's PVs into the axis class to check for direct/locked mode
if parent is None:
def maybe_add_prefix(self, instance, kw, suffix):
# Patched not to enforce parent prefix when no parent
if kw in self.add_prefix:
return suffix
return suffix
self.__class__.__dict__["abr_mode"].maybe_add_prefix = types.MethodType(
maybe_add_prefix, self.__class__.__dict__["abr_mode"]
)
self.__class__.__dict__["abr_mode_direct"].maybe_add_prefix = types.MethodType(
maybe_add_prefix, self.__class__.__dict__["abr_mode_direct"]
)
logger.info(self.__class__.__dict__["abr_mode"].kwargs)
self.__class__.__dict__["abr_mode"].suffix = base_pv + "-DF1:AXES-MODE"
self.__class__.__dict__["abr_mode_direct"].suffix = base_pv + "-DF1:MODE-DIRECT"
super().__init__(
prefix=prefix,
name=name,
kind=kind,
read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
parent=parent,
**kwargs,
)
self.llm.set(llm).wait()
self.hlm.set(hlm).wait()
self.vmin.set(vmin).wait()
self.vmax.set(vmax).wait()
def omove(
self,
position,
wait=True,
timeout=None,
# moved_cb=None,
velocity=None,
relative=False,
direct=False,
**kwargs,
) -> MoveStatus:
"""Native absolute/relative movement on the A3200"""
# Check if we're in direct movement mode
if self.abr_mode.value not in (AbrMode.DIRECT, "DIRECT"):
if direct:
self.abr_mode_direct.set(1).wait()
else:
raise RuntimeError(f"ABR axis not in direct mode: {self.abr_mode.value}")
# Before moving, ensure we can stop (if a stop_signal is configured).
if self.stop_signal is not None:
self.stop_signal.wait_for_connection()
# Set velocity if provided
if velocity is not None:
self.velocity.set(velocity).wait()
# This is adapted from pv_positioner.py
self.check_value(position)
# Get MoveStatus from parent of parent
status = PositionerBase.move(self, position=position, timeout=timeout, **kwargs)
has_done = self.done is not None
if not has_done:
moving_val = 1 - self.done_value
self._move_changed(value=self.done_value)
self._move_changed(value=moving_val)
try:
if relative:
# Relative movement instead of setpoint
self.relmove.put(position, wait=True)
else:
# Standard absolute movement
self.setpoint.put(position, wait=True)
if wait:
status.wait()
except KeyboardInterrupt:
self.stop()
raise
return status
def move(self, position, wait=True, timeout=None, moved_cb=None, **kwargs) -> MoveStatus:
"""Exposes the ophyd move command through BEC abstraction"""
return self.omove(position, wait=wait, timeout=timeout, moved_cb=moved_cb, **kwargs)
def rock(self, distance, counts: int, velocity=None, wait=True) -> Status:
"""Repeated single axis zigzag scan I guess PSO should be configured for this"""
self._rock_dist.put(distance)
self._rock_count.put(counts)
if velocity is not None:
self._rock_velo.put(velocity)
# if acceleration is not None:
# self._rock_accel.put(acceleration)
self._rock.put(1)
status = super().move(position=distance)
if wait:
status.wait()
return status
# def is_omega_ok(self):
# """Checks omega axis status"""
# return 0 == self.self.omega.status.get()
# def is_homed(self):
# """Checks if omega is homed"""
# return 1 == self.omega.is_homed.get()
# def do_homing(self, wait=True):
# """Execute the homing procedure.
# Executes the homing procedure on omega and waits (default) until it is completed.
# TODO: Return a status object to do this wwith futures and monitoring.
# PARAMETERS
# `wait` true / false if the routine is to wait for the homing to finish.
# """
# self.omega.home.set(1, settle_time=1).wait()
# if not wait:
# return
# while not self.omega.is_homed():
# time.sleep(0.2)
# Automatically start an axis if directly invoked
if __name__ == "__main__":
omega = A3200Axis(prefix="X06DA-ES-DF1:OMEGA", name="omega")
omega.wait_for_connection()

View File

@@ -0,0 +1,189 @@
import time
import requests
from ophyd import Component, Device, Kind, Signal, SignalRO
try:
from bec_lib import bec_logger
logger = bec_logger.logger
except ModuleNotFoundError:
import logging
logger = logging.getLogger("SmarGon")
class SmarGonSignal(Signal):
"""SmarGonSignal (R/W)
Small helper class to read/write parameters from SmarGon. As there is no
motion status readback from smargopolo, this should be substituted with
setting with 'settle_time'.
"""
def __init__(self, *args, write_addr="targetSCS", low_limit=None, high_limit=None, **kwargs):
super().__init__(*args, **kwargs)
self.write_addr = write_addr
self.addr = self.parent.name
self._limits = (low_limit, high_limit)
# self.get()
def put(self, value, *, timestamp=None, **kwargs):
"""Overriden put to add communication with smargopolo"""
# Validate new value
self.check_value(value)
if timestamp is None:
timestamp = time.time()
# Perform the actual write to SmargoPolo
r = self.parent._go_n_put(f"{self.write_addr}?{self.addr.upper()}={value}", **kwargs)
old_value = self._readback
self._timestamp = timestamp
self._readback = r[self.addr.upper()]
self._value = r[self.addr.upper()]
# Notify subscribers
self._run_subs(
sub_type=self.SUB_VALUE, old_value=old_value, value=value, timestamp=self._timestamp
)
@property
def limits(self):
return self._limits
def check_value(self, value, **kwargs):
"""Check if value falls within limits"""
lol = self.limits[0]
if lol is not None:
if value < lol:
raise ValueError(f"Target {value} outside of limits {self.limits}")
hil = self.limits[1]
if hil is not None:
if value > hil:
raise ValueError(f"Target {value} outside of limits {self.limits}")
def get(self, *args, **kwargs):
r = self.parent._go_n_get(self.write_addr)
# print(r)
if isinstance(r, dict):
self._value = r[self.addr.upper()]
else:
self._value = r
return super().get(*args, **kwargs)
class SmarGonSignalRO(Signal):
"""Small helper class for read-only parameters PVs from SmarGon.
TODO: Add monitoring
"""
def __init__(self, *args, read_addr="readbackSCS", **kwargs):
super().__init__(*args, **kwargs)
self._metadata["write_access"] = False
self.read_addr = read_addr
self.addr = self.parent.name
def get(self, *args, **kwargs):
r = self.parent._go_n_get(self.read_addr)
# print(r)
if isinstance(r, dict):
self.put(r[self.addr.upper()], force=True)
else:
self.put(r, force=True)
return super().get(*args, **kwargs)
class SmarGonAxis(Device):
"""SmarGon client deice
This class controls the SmarGon goniometer via the REST interface.
"""
# Status attributes
sg_url = Component(Signal, kind=Kind.config, metadata={"write_access": False})
corr = Component(SmarGonSignalRO, read_addr="corr_type", kind=Kind.config)
mode = Component(SmarGonSignalRO, read_addr="mode", kind=Kind.config)
# Axis parameters
readback = Component(SmarGonSignalRO, kind=Kind.hinted)
setpoint = Component(SmarGonSignal, kind=Kind.normal)
done = Component(SignalRO, value=1, kind=Kind.normal)
# moving = Component(SmarGonMovingSignalRO, kind=Kind.config)
moving = 1
def __init__(
self,
prefix="SCS",
*,
name,
kind=None,
read_attrs=None,
configuration_attrs=None,
parent=None,
device_manager=None,
sg_url: str = "http://x06da-smargopolo.psi.ch:3000",
low_limit=None,
high_limit=None,
**kwargs,
) -> None:
self.__class__.__dict__["readback"].kwargs["read_addr"] = f"readback{prefix}"
self.__class__.__dict__["setpoint"].kwargs["write_addr"] = f"target{prefix}"
self.__class__.__dict__["setpoint"].kwargs["low_limit"] = low_limit
self.__class__.__dict__["setpoint"].kwargs["high_limit"] = high_limit
super().__init__(
prefix=prefix,
name=name,
kind=kind,
read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
parent=parent,
# device_manager=device_manager,
**kwargs,
)
self.sg_url._metadata["write_access"] = False
self.sg_url.set(sg_url, force=True).wait()
def initialize(self):
"""Helper function for initial readings"""
# self.corr.get()
# self.mode.get()
r = self._go_n_get("corr_type")
print(r)
def _go_n_get(self, address, **kwargs):
"""Helper function to connect to smargopolo"""
cmd = f"{self.sg_url.get()}/{address}"
r = requests.get(cmd, timeout=1, **kwargs)
if not r.ok:
raise RuntimeError(
f"[{self.name}] Error getting {address}; reply was {r.status_code} => {r.reason}"
)
return r.json()
def _go_n_put(self, address, **kwargs):
"""Helper function to connect to smargopolo"""
cmd = f"{self.sg_url.get()}/{address}"
r = requests.put(cmd, timeout=1, **kwargs)
if not r.ok:
raise RuntimeError(
f"[{self.name}] Error putting {address}; reply was {r.status_code} => {r.reason}"
)
return r.json()
if __name__ == "__main__":
shx = SmarGonAxis(prefix="SCS", name="shx", sg_url="http://x06da-smargopolo.psi.ch:3000")
shy = SmarGonAxis(prefix="SCS", name="shy", sg_url="http://x06da-smargopolo.psi.ch:3000")
shz = SmarGonAxis(
prefix="SCS",
name="shz",
low_limit=10,
high_limit=22,
sg_url="http://x06da-smargopolo.psi.ch:3000",
)
shx.wait_for_connection()
shy.wait_for_connection()
shz.wait_for_connection()

View File

@@ -0,0 +1,393 @@
#!/usr/bin/env python3
from time import sleep, time
from typing import Tuple
from requests import get, put
from beamline import beamline
from mx_redis import SMARGON
try:
from mx_preferences import get_config
host = get_config(beamline)["smargon"]["host"]
port = get_config(beamline)["smargon"]["port"]
except Exception:
host = "x06da-smargopolo.psi.ch"
port = 3000
base = f"http://{host}:{port}"
def gonget(thing: str, **kwargs) -> dict:
"""issue a GET for some API component on the smargopolo server"""
cmd = f"{base}/{thing}"
if kwargs.get("verbose", False):
print(cmd)
r = get(cmd)
if not r.ok:
raise Exception(f"error getting {thing}; server returned {r.status_code} => {r.reason}")
return r.json()
def gonput(thing: str, **kwargs):
"""issue a PUT for some API component on the smargopolo server"""
cmd = f"{base}/{thing}"
if kwargs.get("verbose", False):
print(cmd)
put(cmd)
def scsput(**kwargs):
"""
Issue a new absolute target in the SH coordinate system.
The key "verbose" may be passed in kwargs with any true
value for verbose behaviour.
:param kwargs: a dict containing keys ("shx", "shy", "shz", "chi", "phi")
:type kwargs: dict
:return:
:rtype:
"""
xyz = {
k.upper(): v for k, v in kwargs.items() if k.lower() in ("shx", "shy", "shz", "chi", "phi")
}
thing = "&".join([f"{k.upper()}={float(v):.5f}" for k, v in xyz.items()])
cmd = f"{base}/targetSCS?{thing}"
if kwargs.get("verbose", False):
print(cmd)
put(cmd)
def bcsput(**kwargs):
"""
Issue a new absolute target in the beamline coordinate system.
The key "verbose" may be passed in kwargs with any true
value for verbose behaviour.
:param kwargs: a dict containing keys ("bx", "by", "bz", "chi", "phi")
:return:
:rtype:
"""
xyz = {k.upper(): v for k, v in kwargs.items() if k.lower() in ("bx", "by", "bz", "chi", "phi")}
thing = "&".join([f"{k.upper()}={float(v):.5f}" for k, v in xyz.items()])
cmd = f"{base}/targetBCS?{thing}"
if kwargs.get("verbose", False):
print(cmd)
put(cmd)
def scsrelput(**kwargs) -> None:
"""
Issue relative increments to current SH coordinate system.
The key "verbose" may be passed in kwargs with any true
value for verbose behaviour.
:param kwargs: a dict containing keys ("shx", "shy", "shz", "chi", "phi")
:type kwargs: dict
:return:
:rtype:
"""
xyz = {
k.upper(): v for k, v in kwargs.items() if k.lower() in ("shx", "shy", "shz", "chi", "phi")
}
thing = "&".join([f"{k.upper()}={float(v):.5f}" for k, v in xyz.items()])
cmd = f"{base}/targetSCS_rel?{thing}"
if kwargs.get("verbose", False):
print(cmd)
put(cmd)
def bcsrelput(**kwargs):
"""
Issue relative increments to current beamline coordinate system.
The key "verbose" may be passed in kwargs with any true
value for verbose behaviour.
:param kwargs: a dict containing keys ("bx", "by", "bz")
:type kwargs: dict
:return:
:rtype:
"""
xyz = {k.upper(): v for k, v in kwargs.items() if k.lower() in ("bx", "by", "bz")}
thing = "&".join([f"{k.upper()}={float(v):.5f}" for k, v in xyz.items()])
cmd = f"{base}/targetBCS_rel?{thing}"
if kwargs.get("verbose", False):
print(cmd)
put(cmd)
# url_redis = f"{beamline}-cons-705.psi.ch"
# print(f"connecting to redis DB #3 on host: {url_redis}")
# redis_handle = redis.StrictRedis(host=url_redis, db=3)
# pubsub = redis_handle.pubsub()
MODE_UNINITIALIZED = 0
MODE_INITIALIZING = 1
MODE_READY = 2
MODE_ERROR = 99
class SmarGon(object):
def __init__(self):
super(SmarGon, self).__init__()
self.__dict__.update(target=None)
self.__dict__.update(bookmarks={})
self.__dict__.update(_latest_message={})
# pubsub.psubscribe(**{f"__keyspace@{SMARGON.value}__:*": self._cb_readbackSCS})
# pubsub.run_in_thread(sleep_time=0.5, daemon=True)
def __repr__(self):
BX, BY, BZ, OMEGA, CHI, PHI, a, b, c = self.readback_bcs().values()
return f"<{self.__class__.__name__} X={BX:.3f}, Y={BY:.3f}, Z={BZ:.3f}, CHI={CHI:.3f}, PHI={PHI:.3f}, OMEGA={OMEGA:.3f}>"
def _cb_readbackSCS(self, msg):
if msg["data"] in ["hset"]:
self._latest_message = msg
def move_home(self, wait=False) -> None:
"""move to beamline coordinate system X, Y, Z, Chi, Phi = 0 0 0 0 0"""
self.apply_bookmark_sh({"shx": 0.0, "shy": 0.0, "shz": 18.0, "chi": 0.0, "phi": 0.0})
if wait:
self.wait_home()
def xyz(self, coords: Tuple[float, float, float], wait: bool = True) -> None:
"""
Move smargon in absolute beamline coordinates
:param coords: a tuple of floats representing X, Y, Z coordinates
:type coords:
:param wait:
:type wait:
:return:
:rtype:
"""
x, y, z = coords
# the two steps below are necessary otherwise the control system
# remembers *a* previous CHI
bcs = self.bcs
bcs.update({"BX": x, "BY": y, "BZ": z})
self.bcs = bcs
if wait:
self.wait()
def wait_home(self, timeout: float = 20.0) -> None:
"""
wait for the smargon to reach its home position:
SHX = 0.0
SHY = 0.0
SHZ = 18.0
CHI = 0.0
PHI = 0.0
:param timeout: time to wait for positions to be reached raises TimeoutError if timeout reached
:type timeout: float
:return:
:rtype:
"""
tout = timeout + time()
in_place = [False, False]
rbv = -999.0
while not all(in_place) and time() < tout:
rbv = self.readback_scs()
in_place = []
for k, v in {"SHX": 0.0, "SHY": 0.0, "SHZ": 18.0, "CHI": 0.0, "PHI": 0.0}.items():
in_place.append(abs(rbv[k] - v) < 0.01)
if time() > tout:
raise TimeoutError(f"timeout waiting for smargon to reach home position: {rbv}")
def push_bookmark(self):
"""
save current absolute coordinates in FIFO stack
:return:
:rtype:
"""
t = round(time())
self.bookmarks[t] = self.readback_scs()
def pop_bookmark(self):
return self.bookmarks.popitem()[1]
def apply_bookmark_sh(self, scs):
scsput(**scs)
def apply_last_bookmark_sh(self):
scs = self.pop_bookmark()
scsput(**scs)
def readback_mcs(self):
"""current motor positions of the smargon sliders"""
return gonget("readbackMCS")
def readback_scs(self):
"""current SH coordinates of the smargon model"""
return gonget("readbackSCS")
def readback_bcs(self):
"""current beamline coordinates of the smargon"""
return gonget("readbackBCS")
def target_scs(self):
"""currently assigned targets for the smargon control system"""
return gonget("targetSCS")
def initialize(self):
"""initialize the smargon"""
self.set_mode(MODE_UNINITIALIZED)
sleep(0.1)
self.set_mode(MODE_INITIALIZING)
def set_mode(self, mode: int):
"""put smargon control system in a given mode
MODE_UNINITIALIZED = 0
MODE_INITIALIZING = 1
MODE_READY = 2
MODE_ERROR = 99
"""
gonput(f"mode?mode={mode}")
def enable_correction(self):
"""enable calibration based corrections"""
gonput("corr_type?corr_type=1")
def disable_correction(self):
"""disable calibration based corrections"""
gonput("corr_type?corr_type=0")
def chi(self, val=None, wait=False):
if val is None:
return self.readback_scs()["CHI"]
scsput(CHI=val)
if wait:
timeout = 10 + time()
while time() < timeout:
if abs(val - self.readback_scs()["CHI"]) < 0.1:
break
if time() > timeout:
raise RuntimeError(f"SmarGon CHI did not reach requested target {val} in time")
def phi(self, val=None, wait=False):
if val is None:
return self.readback_scs()["PHI"]
scsput(PHI=val)
if wait:
timeout = 70 + time()
while time() < timeout:
if abs(val - self.readback_scs()["PHI"]) < 0.1:
break
if time() > timeout:
raise RuntimeError(f"SmarGon PHI did not reach requested target {val} in time")
def wait(self, timeout=60.0):
"""waits up to `timeout` seconds for smargon to reach target"""
target = {
k.upper(): v
for k, v in self.target_scs().items()
if k.lower() in ("shx", "shy", "shz", "chi", "phi")
}
timeout = timeout + time()
while time() < timeout:
s = {
k: (abs(v - target[k]) < 0.01)
for k, v in self.readback_scs().items()
if k.upper() in ("SHX", "SHY", "SHZ", "CHI", "PHI")
}
if all(list(s.values())):
break
if time() > timeout:
raise TimeoutError("timed out waiting for smargon to reach target")
def __setattr__(self, key, value):
key = key.lower()
if key == "mode":
self.set_mode(value)
elif key == "correction":
assert value in (
0,
1,
False,
True,
), "correction is either 1 or True (enabled) or 0 (disabled)"
gonput(f"corr_type?corr_type?{value}")
elif key == "scs":
scsput(**value)
elif key == "bcs":
bcsput(**value)
elif key == "target":
if not isinstance(value, dict):
raise Exception(
f"expected a dict with target axis and values got something else: {value}"
)
for k in value.keys():
if k.lower() not in "shx shy shz chi phi ox oy oz".split():
raise Exception(f'unknown axis in target "{k}"')
scsput(**value)
elif key in "shx shy shz chi phi ox oy oz".split():
scsput(**{key: value})
elif key in "bx by bz".split():
bcs = self.readback_bcs()
bcs[key] = value
bcsput(**bcs)
else:
self.__dict__[key].update(value)
def __getattr__(self, key):
key = key.lower()
if key == "mode":
return self.readback_mcs()["mode"]
elif key == "correction":
return gonget("corr_type")
elif key == "bcs":
return self.readback_bcs()
elif key == "mcs":
return self.readback_mcs()
elif key == "scs":
return self.readback_scs()
elif key in "shx shy shz chi phi ox oy oz".split():
return self.readback_scs()[key.upper()]
elif key in "bx by bz".split():
return self.readback_bcs()[key.upper()]
else:
return self.__getattribute__(key)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="SmarGon client")
parser.add_argument("-i", "--initialize", help="initialize smargon", action="store_true")
args = parser.parse_args()
smargon = SmarGon()
if args.initialize:
print("initializing smargon device")
import Aerotech
print("moving aerotech back by 50mm")
abr = Aerotech.Abr()
abr.incr_x(-50.0, wait=True, velo=100.0)
print("issuing init command to smargon")
smargon.initialize()
sleep(0.5)
print("waiting for init routine to complete")
while MODE_READY != smargon.mode:
sleep(0.5)
print("moving smargon to HOME position")
smargon.move_home()
print("moving aerotech to its previous position")
abr.incr_x(50.0, wait=True, velo=100.0)
exit(0)

View File

@@ -0,0 +1,9 @@
# -*- coding: utf-8 -*-
"""
Ophyd devices for the PX III beamline, including the MX specific Aerotech A3200 stage.
@author: mohacsi_i
"""
from .A3200 import AerotechAbrStage
from .A3200utils import A3200Axis
from .SmarGon import SmarGonAxis

View File

@@ -0,0 +1,6 @@
from .mx_measurements import (
MeasureStandardWedge,
MeasureVerticalLine,
MeasureRasterSimple,
MeasureScreening,
)

View File

@@ -0,0 +1,249 @@
""" MX measurements module
Scan primitives for standard BEC scans at the PX beamlines at SLS.
Theese scans define the event model and can be called from higher levels.
"""
from bec_lib import bec_logger
from bec_server.scan_server.scans import AsyncFlyScanBase
logger = bec_logger.logger
class AbrCmd:
"""Valid Aerotech ABR scan commands from the AeroBasic files"""
NONE = 0
RASTER_SCAN_SIMPLE = 1
MEASURE_STANDARD = 2
VERTICAL_LINE_SCAN = 3
SCREENING = 4
# SUPER_FAST_OMEGA = 5 # Some Japanese measured samples in capilaries at high RPMs
# STILL_WEDGE = 6 # NOTE: Unused Step scan
# STILLS = 7 # NOTE: Unused Just send triggers to detector
# REPEAT_SINGLE_OSCILLATION = 8 # NOTE: Unused
# SINGLE_OSCILLATION = 9
# OLD_FASHIONED = 10 # NOTE: Unused
# RASTER_SCAN = 11
# JET_ROTATION = 12 # NOTE: Unused
# X_HELICAL = 13 # NOTE: Unused
# X_RUNSEQ = 14 # NOTE: Unused
# JUNGFRAU = 15
# MSOX = 16 # NOTE: Unused
# SLIT_SCAN = 17 # NOTE: Unused
# RASTER_SCAN_STILL = 18
# SCAN_SASTT = 19
# SCAN_SASTT_V2 = 20
# SCAN_SASTT_V3 = 21
class AerotechFlyscanBase(AsyncFlyScanBase):
"""Base class for MX flyscans
Low-level base class for standard scans at the PX beamlines at SLS. Theese
scans use the A3200 rotation stage and the actual experiment is performed
using an AeroBasic script controlled via global variables. The base class
has some basic safety features like checking status then sets globals and
fires off the scan. Implementations can choose to set the corresponding
configurations in child classes or pass it as command line parameters.
IMPORTANT: The AeroBasic scripts take care of the PSO configuration.
Parameters:
-----------
abr_complete : bool
Wait for the launched ABR task to complete.
"""
scan_type = "fly"
scan_report_hint = "table"
arg_input = {}
arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None}
# Aerotech stage config
abr_raster_reset = False
abr_complete = False
abr_timeout = None
pointID = 0
num_pos = 0
def __init__(self, *args, parameter: dict = None, **kwargs):
"""Just set num_pos=0 to avoid hanging and override defaults if explicitly set from
parameters.
"""
super().__init__(parameter=parameter, **kwargs)
if "abr_raster_reset" in self.caller_kwargs:
self.abr_raster_reset = self.caller_kwargs.get("abr_raster_reset")
if "abr_complete" in self.caller_kwargs:
self.abr_complete = self.caller_kwargs.get("abr_complete")
if "abr_timeout" in self.caller_kwargs:
self.abr_timeout = self.caller_kwargs.get("abr_timeout")
def pre_scan(self):
"""Mostly just checking if ABR stage is ok..."""
# TODO: Move roughly to start position???
# ABR status checking
stat = yield from self.stubs.send_rpc_and_wait("abr", "status.get")
if stat not in (0, "OK"):
raise RuntimeError("Aerotech ABR seems to be in error state {stat}, please reset")
task = yield from self.stubs.send_rpc_and_wait("abr", "task1.get")
# From what I got values are copied to local vars at the start of scan,
# so only kickoff should be forbidden.
if task not in (1, "OK"):
raise RuntimeError("Aerotech ABR task #1 seems to busy")
# Reset the raster scan engine
if self.abr_raster_reset:
yield from self.stubs.send_rpc_and_wait("abr", "raster_scan_done.set", 0)
# Call super
yield from super().pre_scan()
# def stage(self):
# """ ToDo: Sot sure if we should call super() here as it'd stage the whole beamline"""
# return super().stage()
def scan_core(self):
"""The actual scan logic comes here."""
# Kick off the run
yield from self.stubs.send_rpc_and_wait("abr", "bluekickoff")
logger.info("Measurement launched on the ABR stage...")
# Wait for grid scanner to finish
if self.abr_complete:
if self.abr_timeout is not None:
st = yield from self.stubs.send_rpc_and_wait("abr", "complete", self.abr_timeout)
st.wait()
else:
st = yield from self.stubs.send_rpc_and_wait("abr", "complete")
st.wait()
def cleanup(self):
"""Set scan progress to 1 to finish the scan"""
self.num_pos = 1
return super().cleanup()
class MeasureStandardWedge(AerotechFlyscanBase):
"""Standard wedge scan using the OMEGA motor
Measure an absolute continous line scan from `start` to `start` + `range`
during `move_time` on the Omega axis with PSO output.
The scan itself is executed by the scan service running on the Aerotech
controller. Ophyd just configures, launches it and waits for completion.
Example
-------
>>> scans.standard_wedge(start=42, range=10, move_time=20)
Parameters
----------
start : (float, float)
Scan start position of the axis.
range : (float, float)
Scan range of the axis.
move_time : (float)
Total travel time for the movement [s].
ready_rate : float, optional
No clue what is this... (default=500)
"""
scan_name = "standardscan"
required_kwargs = ["start", "range", "move_time"]
class MeasureVerticalLine(AerotechFlyscanBase):
"""Vertical line scan using the GMY motor
Simple relative continous line scan that records a single vertical line
with PSO output. There's no actual stepping, it's only used for velocity
calculation.
The scan itself is executed by the scan service running on the Aerotech
controller. Ophyd just configures, launches it and waits for completion.
Example
-------
>>> scans.measure_vline(range_y=12, steps_y=40, exp_time=0.1)
Parameters
----------
range : float
Step size [mm].
steps : int
Scan range of the axis.
exp_time : float
Eeffective exposure time per step [s]
"""
scan_name = "vlinescan"
required_kwargs = ["exp_time", "range", "steps"]
class MeasureRasterSimple(AerotechFlyscanBase):
"""Simple raster scan
Measure a simplified relative zigzag raster scan in the X-Y plane.
The scan is relative assumes the goniometer is currently at the CENTER of
the first cell (i.e. TOP-LEFT). Each line is executed as a single continous
movement, i.e. there's no actual stepping in the X direction.
The scan itself is executed by the scan service running on the Aerotech
controller. Ophyd just configures, launches it and waits for completion.
Example
-------
>>> scans.raster_simple(exp_time=0.1, range_x=4, range_y=4, steps_x=80, steps_y=80)
Parameters
----------
exp_time : float
Effective exposure time for each cell along the X axis [s].
range_x : float
Scan step size [mm].
range_y : float
Scan step size [mm].
steps_x : int
Number of scan steps in X (fast). Only used for velocity calculation.
steps_y : int
Number of scan steps in Y (slow).
"""
scan_name = "rasterscan"
required_kwargs = ["exp_time", "range_x", "range_y", "steps_x", "steps_y"]
class MeasureScreening(AerotechFlyscanBase):
"""Sample screening scan
Sample screening scan that scans intervals on the rotation axis taking
1 image/interval. This makes sure that we hit diffraction peaks if there
are any crystals.
The scan itself is executed by the scan service running on the Aerotech
controller. Ophyd just configures, launches it and waits for completion.
Example
-------
>>> scans.measure_screening(start=42, range=180, steps=18, exp_time=0.1, oscrange=2.0)
Parameters
----------
start : float
Absolute scan start position of the omega axis [deg].
range : float
Total screened range of the omega axis relative to 'start' [deg].
steps : int
Number of blurred intervals.
exp_time : float
Exposure time per blurred interval [s].
oscrange : float
Motion blurring of each interval [deg]
delta : float
Safety margin for sub-range movements (default: 0.5) [deg].
"""
scan_name = "screeningscan"
required_kwargs = ["start", "range", "steps", "exp_time", "oscrange"]