BEC style A3200 seems working
This commit is contained in:
@@ -100,7 +100,7 @@ dccm_xbpm:
|
||||
ssxbpm_trx:
|
||||
description: XBPM motion before secondary source
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig: {prefix: 'X06DA-ES-SSBPM1:TRX'}
|
||||
deviceConfig: {prefix: 'X06DA-ES-SSBPM1:TRX1'}
|
||||
onFailure: buffer
|
||||
enabled: true
|
||||
readoutPriority: monitored
|
||||
@@ -109,7 +109,7 @@ ssxbpm_trx:
|
||||
ssxbpm_try:
|
||||
description: XBPM motion before secondary source
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig: {prefix: 'X06DA-ES-SSBPM1:TRY'}
|
||||
deviceConfig: {prefix: 'X06DA-ES-SSBPM1:TRY1'}
|
||||
onFailure: buffer
|
||||
enabled: true
|
||||
readoutPriority: monitored
|
||||
@@ -127,7 +127,7 @@ ssxbpm_try:
|
||||
ssslit_trxr:
|
||||
description: Secondary source blade motion
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig: {prefix: 'X06DA-ES-SSSLH1:TRXR'}
|
||||
deviceConfig: {prefix: 'X06DA-ES-SSSH1:TRXR'}
|
||||
onFailure: buffer
|
||||
enabled: true
|
||||
readoutPriority: monitored
|
||||
@@ -136,7 +136,7 @@ ssslit_trxr:
|
||||
ssslit_trxw:
|
||||
description: Secondary source blade motion
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig: {prefix: 'X06DA-ES-SSSLH1:TRXW'}
|
||||
deviceConfig: {prefix: 'X06DA-ES-SSSH1:TRXW'}
|
||||
onFailure: buffer
|
||||
enabled: true
|
||||
readoutPriority: monitored
|
||||
@@ -145,7 +145,7 @@ ssslit_trxw:
|
||||
ssslit_tryt:
|
||||
description: Secondary source blade motion
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig: {prefix: 'X06DA-ES-SSSLV1:TRYT'}
|
||||
deviceConfig: {prefix: 'X06DA-ES-SSSV1:TRYT'}
|
||||
onFailure: buffer
|
||||
enabled: true
|
||||
readoutPriority: monitored
|
||||
@@ -154,7 +154,7 @@ ssslit_tryt:
|
||||
ssslit_tryb:
|
||||
description: Secondary source blade motion
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig: {prefix: 'X06DA-ES-SSSLV1:TRYB'}
|
||||
deviceConfig: {prefix: 'X06DA-ES-SSSV1:TRYB'}
|
||||
onFailure: buffer
|
||||
enabled: true
|
||||
readoutPriority: monitored
|
||||
@@ -163,7 +163,7 @@ ssslit_tryb:
|
||||
ssxi1_trx:
|
||||
description: Secondary source diagnostic screen motion
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig: {prefix: 'X06DA-ES-SSXI1:TRX'}
|
||||
deviceConfig: {prefix: 'X06DA-ES-SSXI1:TRX1'}
|
||||
onFailure: buffer
|
||||
enabled: true
|
||||
readoutPriority: monitored
|
||||
@@ -172,7 +172,7 @@ ssxi1_trx:
|
||||
ssxi1_try:
|
||||
description: Secondary source diagnostic screen motion
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig: {prefix: 'X06DA-ES-SSXI1:TRY'}
|
||||
deviceConfig: {prefix: 'X06DA-ES-SSXI1:TRY1'}
|
||||
onFailure: buffer
|
||||
enabled: true
|
||||
readoutPriority: monitored
|
||||
@@ -194,34 +194,34 @@ vfm_trxd:
|
||||
readoutPriority: monitored
|
||||
readOnly: false
|
||||
softwareTrigger: false
|
||||
vfm_tryuw:
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig: {prefix: 'X06DA-ES-VFM:TRYUW'}
|
||||
onFailure: buffer
|
||||
enabled: true
|
||||
readoutPriority: monitored
|
||||
readOnly: false
|
||||
softwareTrigger: false
|
||||
vfm_tryr:
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig: {prefix: 'X06DA-ES-VFM:TRYR'}
|
||||
onFailure: buffer
|
||||
enabled: true
|
||||
readoutPriority: monitored
|
||||
readOnly: false
|
||||
softwareTrigger: false
|
||||
vfm_trydw:
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig: {prefix: 'X06DA-ES-VFM:TRYDW'}
|
||||
onFailure: buffer
|
||||
enabled: true
|
||||
readoutPriority: monitored
|
||||
readOnly: false
|
||||
softwareTrigger: false
|
||||
# vfm_tryuw:
|
||||
# deviceClass: ophyd.EpicsMotor
|
||||
# deviceConfig: {prefix: 'X06DA-ES-VFM:TRYUW'}
|
||||
# onFailure: buffer
|
||||
# enabled: true
|
||||
# readoutPriority: monitored
|
||||
# readOnly: false
|
||||
# softwareTrigger: false
|
||||
# vfm_tryr:
|
||||
# deviceClass: ophyd.EpicsMotor
|
||||
# deviceConfig: {prefix: 'X06DA-ES-VFM:TRYR'}
|
||||
# onFailure: buffer
|
||||
# enabled: true
|
||||
# readoutPriority: monitored
|
||||
# readOnly: false
|
||||
# softwareTrigger: false
|
||||
# vfm_trydw:
|
||||
# deviceClass: ophyd.EpicsMotor
|
||||
# deviceConfig: {prefix: 'X06DA-ES-VFM:TRYDW'}
|
||||
# onFailure: buffer
|
||||
# enabled: true
|
||||
# readoutPriority: monitored
|
||||
# readOnly: false
|
||||
# softwareTrigger: false
|
||||
vfm_pitch:
|
||||
description: KB mirror vertical steering
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig: {prefix: 'X06DA-ES1-VFM:PITCH'}
|
||||
deviceConfig: {prefix: 'X06DA-ES-VFM:PITCH'}
|
||||
onFailure: buffer
|
||||
enabled: true
|
||||
readoutPriority: monitored
|
||||
@@ -229,7 +229,7 @@ vfm_pitch:
|
||||
softwareTrigger: false
|
||||
vfm_yaw:
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig: {prefix: 'X06DA-ES1-VFM:YAW'}
|
||||
deviceConfig: {prefix: 'X06DA-ES-VFM:YAW'}
|
||||
enabled: false
|
||||
onFailure: buffer
|
||||
readoutPriority: monitored
|
||||
@@ -237,7 +237,7 @@ vfm_yaw:
|
||||
softwareTrigger: false
|
||||
vfm_roll:
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig: {prefix: 'X06DA-ES1-VFM:ROLL'}
|
||||
deviceConfig: {prefix: 'X06DA-ES-VFM:ROLL'}
|
||||
enabled: false
|
||||
onFailure: buffer
|
||||
readoutPriority: monitored
|
||||
@@ -245,7 +245,7 @@ vfm_roll:
|
||||
softwareTrigger: false
|
||||
vfm_trx:
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig: {prefix: 'X06DA-ES1-VFM:TRX'}
|
||||
deviceConfig: {prefix: 'X06DA-ES-VFM:TRX'}
|
||||
enabled: false
|
||||
onFailure: buffer
|
||||
readoutPriority: monitored
|
||||
@@ -253,7 +253,7 @@ vfm_trx:
|
||||
softwareTrigger: false
|
||||
vfm_try:
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig: {prefix: 'X06DA-ES1-VFM:TRY'}
|
||||
deviceConfig: {prefix: 'X06DA-ES-VFM:TRY'}
|
||||
onFailure: buffer
|
||||
enabled: true
|
||||
readoutPriority: monitored
|
||||
@@ -275,34 +275,34 @@ hfm_trxd:
|
||||
readoutPriority: monitored
|
||||
readOnly: false
|
||||
softwareTrigger: false
|
||||
hfm_tryur:
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig: {prefix: 'X06DA-ES-HFM:TRYUR'}
|
||||
onFailure: buffer
|
||||
enabled: true
|
||||
readoutPriority: monitored
|
||||
readOnly: false
|
||||
softwareTrigger: false
|
||||
hfm_tryw:
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig: {prefix: 'X06DA-ES-HFM:TRYW'}
|
||||
onFailure: buffer
|
||||
enabled: true
|
||||
readoutPriority: monitored
|
||||
readOnly: false
|
||||
softwareTrigger: false
|
||||
hfm_trydr:
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig: {prefix: 'X06DA-ES-HFM:TRYDR'}
|
||||
onFailure: buffer
|
||||
enabled: true
|
||||
readoutPriority: monitored
|
||||
readOnly: false
|
||||
softwareTrigger: false
|
||||
# hfm_tryur:
|
||||
# deviceClass: ophyd.EpicsMotor
|
||||
# deviceConfig: {prefix: 'X06DA-ES-HFM:TRYUR'}
|
||||
# onFailure: buffer
|
||||
# enabled: true
|
||||
# readoutPriority: monitored
|
||||
# readOnly: false
|
||||
# softwareTrigger: false
|
||||
# hfm_tryw:
|
||||
# deviceClass: ophyd.EpicsMotor
|
||||
# deviceConfig: {prefix: 'X06DA-ES-HFM:TRYW'}
|
||||
# onFailure: buffer
|
||||
# enabled: true
|
||||
# readoutPriority: monitored
|
||||
# readOnly: false
|
||||
# softwareTrigger: false
|
||||
# hfm_trydr:
|
||||
# deviceClass: ophyd.EpicsMotor
|
||||
# deviceConfig: {prefix: 'X06DA-ES-HFM:TRYDR'}
|
||||
# onFailure: buffer
|
||||
# enabled: true
|
||||
# readoutPriority: monitored
|
||||
# readOnly: false
|
||||
# softwareTrigger: false
|
||||
hfm_pitch:
|
||||
description: KB mirror horizontal steering
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig: {prefix: 'X06DA-ES1-HFM:PITCH'}
|
||||
deviceConfig: {prefix: 'X06DA-ES-HFM:PITCH'}
|
||||
enabled: false
|
||||
onFailure: buffer
|
||||
readoutPriority: monitored
|
||||
@@ -310,7 +310,7 @@ hfm_pitch:
|
||||
softwareTrigger: false
|
||||
hfm_yaw:
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig: {prefix: 'X06DA-ES1-HFM:YAW'}
|
||||
deviceConfig: {prefix: 'X06DA-ES-HFM:YAW'}
|
||||
enabled: false
|
||||
onFailure: buffer
|
||||
readoutPriority: monitored
|
||||
@@ -318,7 +318,7 @@ hfm_yaw:
|
||||
softwareTrigger: false
|
||||
hfm_roll:
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig: {prefix: 'X06DA-ES1-HFM:ROLL'}
|
||||
deviceConfig: {prefix: 'X06DA-ES-HFM:ROLL'}
|
||||
enabled: false
|
||||
onFailure: buffer
|
||||
readoutPriority: monitored
|
||||
@@ -326,7 +326,7 @@ hfm_roll:
|
||||
softwareTrigger: false
|
||||
hfm_trx:
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig: {prefix: 'X06DA-ES1-HFM:TRX'}
|
||||
deviceConfig: {prefix: 'X06DA-ES-HFM:TRX'}
|
||||
enabled: false
|
||||
onFailure: buffer
|
||||
readoutPriority: monitored
|
||||
@@ -334,7 +334,7 @@ hfm_trx:
|
||||
softwareTrigger: false
|
||||
hfm_try:
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig: {prefix: 'X06DA-ES1-HFM:TRY'}
|
||||
deviceConfig: {prefix: 'X06DA-ES-HFM:TRY'}
|
||||
onFailure: buffer
|
||||
enabled: true
|
||||
readoutPriority: monitored
|
||||
|
||||
@@ -79,11 +79,16 @@ Examples
|
||||
|
||||
"""
|
||||
import time
|
||||
from ophyd import Component, Device, EpicsSignal, EpicsSignalRO, Kind
|
||||
from ophyd import Component, EpicsSignal, EpicsSignalRO, Kind
|
||||
from ophyd.status import SubscriptionStatus
|
||||
from ophyd_devices.interfaces.base_classes.psi_detector_base import PSIDetectorBase as PsiDeviceBase
|
||||
from ophyd_devices.interfaces.base_classes.psi_detector_base import CustomDetectorMixin as CustomDeviceMixin
|
||||
|
||||
try:
|
||||
from .A3200enums import AbrCmd, AbrMode
|
||||
except ImportError:
|
||||
from A3200enums import AbrCmd, AbrMode
|
||||
|
||||
from .A3200utils import A3200RasterScanner, A3200Oscillator
|
||||
from .A3200enums import *
|
||||
|
||||
try:
|
||||
from bec_lib import bec_logger
|
||||
@@ -92,11 +97,114 @@ except ModuleNotFoundError:
|
||||
import logging
|
||||
logger = logging.getLogger("A3200")
|
||||
|
||||
|
||||
# pylint: disable=logging-fstring-interpolation
|
||||
class AerotechAbrMixin(CustomDeviceMixin):
|
||||
""" Configuration class for the Aerotech A3200 controller for the ABR stage"""
|
||||
|
||||
|
||||
|
||||
class AerotechAbrStage(Device):
|
||||
|
||||
|
||||
def on_stage(self):
|
||||
"""
|
||||
|
||||
NOTE: Zac's request is that stage is essentially ARM, i.e. get ready and don't do anything.
|
||||
"""
|
||||
|
||||
logger.warning(f"Configuring {self.parent.scaninfo.scan_msg.info['scan_name']} on ABR")
|
||||
|
||||
d = {}
|
||||
if self.parent.scaninfo.scan_type in ("measure", "measurement"):
|
||||
scanargs = self.parent.scaninfo.scan_msg.info['kwargs']
|
||||
scanname = self.parent.scaninfo.scan_msg.info['scan_name']
|
||||
|
||||
if scanname in ("standardscan"):
|
||||
scan_start = scanargs["start"]
|
||||
scan_range = scanargs["range"]
|
||||
scan_move_time = scanargs["move_time"]
|
||||
scan_ready_rate = scanargs.get("ready_rate", 500)
|
||||
d["scan_command"] = AbrCmd.MEASURE_STANDARD
|
||||
d["var_1"] = scan_start
|
||||
d["var_2"] = scan_range
|
||||
d["var_3"] = scan_move_time
|
||||
d["var_4"] = scan_ready_rate
|
||||
d["var_5"] = 0
|
||||
d["var_6"] = 0
|
||||
d["var_7"] = 0
|
||||
d["var_8"] = 0
|
||||
d["var_9"] = 0
|
||||
if scanname in ("verticallinescan", "vlinescan"):
|
||||
scan_exp_time = scanargs["exp_time"]
|
||||
scan_range_y = scanargs["range"]
|
||||
scan_steps_y = scanargs["steps"]
|
||||
d["scan_command"] = AbrCmd.VERTICAL_LINE_SCAN
|
||||
d["var_1"] = scan_exp_time
|
||||
d["var_2"] = scan_range_y
|
||||
d["var_3"] = scan_steps_y
|
||||
d["var_4"] = 0
|
||||
d["var_5"] = 0
|
||||
d["var_6"] = 0
|
||||
d["var_7"] = 0
|
||||
d["var_8"] = 0
|
||||
d["var_9"] = 0
|
||||
if scanname in ("screeningscan"):
|
||||
scan_start = scanargs["start"]
|
||||
scan_range = scanargs["range"]
|
||||
scan_stepnum_o = scanargs["steps"]
|
||||
scan_exp_time = scanargs["exp_time"]
|
||||
scan_oscrange = scanargs["oscrange"]
|
||||
scan_delta = scanargs.get("delta", 0.5)
|
||||
scan_stepsize_o = scan_range / scan_stepnum_o
|
||||
d["scan_command"] = AbrCmd.SCREENING
|
||||
d["var_1"] = scan_start
|
||||
d["var_2"] = scan_oscrange
|
||||
d["var_3"] = scan_exp_time
|
||||
d["var_4"] = scan_stepsize_o
|
||||
d["var_5"] = scan_stepnum_o
|
||||
d["var_6"] = scan_delta
|
||||
d["var_7"] = 0
|
||||
d["var_8"] = 0
|
||||
d["var_9"] = 0
|
||||
if scanname in ("rasterscan", "rastersimplescan"):
|
||||
scan_exp_time = scanargs["exp_time"]
|
||||
scan_range_x = scanargs["range_x"]
|
||||
scan_range_y = scanargs["range_y"]
|
||||
scan_stepnum_x = scanargs["steps_x"]
|
||||
scan_stepnum_y = scanargs["steps_y"]
|
||||
scan_stepsize_x = scan_range_x / scan_stepnum_x
|
||||
scan_stepsize_y = scan_range_y / scan_stepnum_y
|
||||
d["scan_command"] = AbrCmd.RASTER_SCAN_SIMPLE
|
||||
d["var_1"] = scan_exp_time
|
||||
d["var_2"] = scan_stepsize_x
|
||||
d["var_3"] = scan_stepsize_y
|
||||
d["var_4"] = scan_stepnum_x
|
||||
d["var_5"] = scan_stepnum_y
|
||||
d["var_6"] = 0
|
||||
d["var_7"] = 0
|
||||
d["var_8"] = 0
|
||||
d["var_9"] = 0
|
||||
|
||||
# Reconfigure if got a valid scan config
|
||||
if len(d)>0:
|
||||
self.parent.configure(d)
|
||||
|
||||
# Stage the parent
|
||||
self.parent.bluestage()
|
||||
|
||||
def on_kickoff(self):
|
||||
""" Kick off parent """
|
||||
self.parent.bluekickoff()
|
||||
|
||||
def on_unstage(self):
|
||||
""" Unstage the ABR controller"""
|
||||
self.parent.blueunstage()
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
class AerotechAbrStage(PsiDeviceBase):
|
||||
""" Standard PX stage on A3200 controller
|
||||
|
||||
This is the wrapper class for the standard rotation stage layout for the PX
|
||||
@@ -106,12 +214,15 @@ class AerotechAbrStage(Device):
|
||||
is running as an AeroBasic program on the controller and we communicate to
|
||||
it via 10+1 global variables.
|
||||
"""
|
||||
custom_prepare_cls = AerotechAbrMixin
|
||||
USER_ACCESS = ['reset', 'kickoff', 'complete']
|
||||
|
||||
taskStop = Component(
|
||||
EpicsSignal, "-AERO:TSK-STOP", put_complete=True, kind=Kind.omitted)
|
||||
status = Component(
|
||||
EpicsSignal, "-AERO:STAT", put_complete=True, kind=Kind.omitted)
|
||||
clear = Component(
|
||||
EpicsSignal, "-AERO:CTRL-CLFT", put_complete=True, kind=Kind.omitted)
|
||||
|
||||
# Enable/disable motor movement via the IOC (i.e. make it task-only)
|
||||
axisModeLocked = Component(
|
||||
@@ -126,9 +237,6 @@ class AerotechAbrStage(Device):
|
||||
# EpicsSignal, "-PH1:GET", write_pv="-PH1:SET", put_complete=True, kind=Kind.config,
|
||||
# )
|
||||
|
||||
raster = Component(A3200RasterScanner, "", kind=Kind.config)
|
||||
osc = Component(A3200Oscillator, "", kind=Kind.config)
|
||||
|
||||
# Status flags for all axes
|
||||
omega_done = Component(EpicsSignalRO, "-DF1:OMEGA-DONE", kind=Kind.normal)
|
||||
gmx_done = Component(EpicsSignalRO, "-DF1:GMX-DONE", kind=Kind.normal)
|
||||
@@ -179,7 +287,7 @@ class AerotechAbrStage(Device):
|
||||
if mode=="direct":
|
||||
self.axisModeDirect.set(37, settle_time=settle_time).wait()
|
||||
if mode=="measuring":
|
||||
self.axisAxesMode.set(MEASURING_MODE, settle_time=settle_time).wait()
|
||||
self.axisAxesMode.set(AbrMode.MEASURING, settle_time=settle_time).wait()
|
||||
|
||||
def configure(self, d: dict) -> tuple:
|
||||
"""" Configure the exposure scripts
|
||||
@@ -209,9 +317,8 @@ class AerotechAbrStage(Device):
|
||||
old = self.read_configuration()
|
||||
|
||||
# ToDo: Check if idle before reconfiguring
|
||||
|
||||
# Set the corresponding global variables
|
||||
self.scan_command.set(d['scan_command']).wait()
|
||||
# Set the corresponding global variables
|
||||
if "var_1" in d and d['var_1'] is not None:
|
||||
self._var_1.set(d['var_1']).wait()
|
||||
if "var_2" in d and d['var_2'] is not None:
|
||||
@@ -236,6 +343,34 @@ class AerotechAbrStage(Device):
|
||||
new = self.read_configuration()
|
||||
return old, new
|
||||
|
||||
def bluestage(self):
|
||||
""" Bluesky-style stage
|
||||
|
||||
Since configuration synchronization is not guaranteed, this does
|
||||
nothing. The script launched by kickoff().
|
||||
"""
|
||||
pass
|
||||
|
||||
def bluekickoff(self, timeout=1) -> SubscriptionStatus:
|
||||
""" Kick off the set program """
|
||||
self.start_command.set(1).wait()
|
||||
|
||||
# Define wait until the busy flag goes high
|
||||
def is_busy(*args, value, **kwargs):
|
||||
return bool(value==0)
|
||||
|
||||
# Subscribe and wait for update
|
||||
status = SubscriptionStatus(self.raster_scan_done, is_busy, timeout=timeout, settle_time=0.1)
|
||||
return status
|
||||
|
||||
def blueunstage(self, settle_time=0.1):
|
||||
""" Stops current script and releases the axes"""
|
||||
# Disarm commands
|
||||
self.scan_command.set(AbrCmd.NONE, settle_time=settle_time).wait()
|
||||
self.stop_command.set(1).wait()
|
||||
# Go to direct mode
|
||||
self.set_axis_mode("direct", settle_time=settle_time)
|
||||
|
||||
def complete(self, timeout=None) -> SubscriptionStatus:
|
||||
""" Waits for task execution
|
||||
|
||||
@@ -262,7 +397,7 @@ class AerotechAbrStage(Device):
|
||||
couple of seconds.
|
||||
"""
|
||||
# Disarm commands
|
||||
self.scan_command.set(CMD_NONE, settle_time=settle_time).wait()
|
||||
self.scan_command.set(AbrCmd.NONE, settle_time=settle_time).wait()
|
||||
self.stop_command.set(1, settle_time=settle_time)
|
||||
# Reload tasks
|
||||
self.taskStop.set(1, settle_time=wait_after_reload).wait()
|
||||
@@ -273,7 +408,8 @@ class AerotechAbrStage(Device):
|
||||
""" Stops current motions
|
||||
"""
|
||||
# Disarm commands
|
||||
self.scan_command.set(CMD_NONE, settle_time=settle_time).wait()
|
||||
self.scan_command.set(AbrCmd.NONE, settle_time=settle_time).wait()
|
||||
self.stop_command.set(1).wait()
|
||||
# Go to direct mode
|
||||
self.set_axis_mode("direct", settle_time=settle_time)
|
||||
|
||||
@@ -281,29 +417,29 @@ class AerotechAbrStage(Device):
|
||||
"""Checks execution status"""
|
||||
return 0 == self.status.get()
|
||||
|
||||
@property
|
||||
def exp_time(self):
|
||||
return self.osc.exp_time.get()
|
||||
# @property
|
||||
# def exp_time(self):
|
||||
# return self.osc.exp_time.get()
|
||||
|
||||
@exp_time.setter
|
||||
def exp_time(self, value):
|
||||
self.osc.etime.set(value).wait()
|
||||
# @exp_time.setter
|
||||
# def exp_time(self, value):
|
||||
# self.osc.etime.set(value).wait()
|
||||
|
||||
@property
|
||||
def start_angle(self):
|
||||
return self.osc.ostart_pos.get()
|
||||
# @property
|
||||
# def start_angle(self):
|
||||
# return self.osc.ostart_pos.get()
|
||||
|
||||
@start_angle.setter
|
||||
def start_angle(self, value):
|
||||
self.osc.ostart_pos(value).wait()
|
||||
# @start_angle.setter
|
||||
# def start_angle(self, value):
|
||||
# self.osc.ostart_pos(value).wait()
|
||||
|
||||
@property
|
||||
def measurement_state(self):
|
||||
return self.osc.phase.get()
|
||||
# @property
|
||||
# def measurement_state(self):
|
||||
# return self.osc.phase.get()
|
||||
|
||||
@measurement_state.setter
|
||||
def measurement_state(self, value):
|
||||
self.osc.phase.set(value).wait()
|
||||
# @measurement_state.setter
|
||||
# def measurement_state(self, value):
|
||||
# self.osc.phase.set(value).wait()
|
||||
|
||||
@property
|
||||
def axis_mode(self):
|
||||
@@ -346,16 +482,16 @@ class AerotechAbrStage(Device):
|
||||
self.omega_done.get()
|
||||
and self.gmx_done.get() and self.gmy_done.get() and self.gmz_done.get() )
|
||||
|
||||
def start_exposure(self):
|
||||
"""Starts the previously configured exposure."""
|
||||
self.wait_for_movements()
|
||||
self.osc.taskStart.set(1).wait()
|
||||
for _ in range(10):
|
||||
try:
|
||||
self.osc.wait_status(ABR_BUSY, timeout=1)
|
||||
except RuntimeWarning as ex:
|
||||
logger.error(f"{ex} --- trying start again.")
|
||||
self.osc.kickoff()
|
||||
# def start_exposure(self):
|
||||
# """Starts the previously configured exposure."""
|
||||
# self.wait_for_movements()
|
||||
# self.osc.taskStart.set(1).wait()
|
||||
# for _ in range(10):
|
||||
# try:
|
||||
# self.osc.wait_status(ABR_BUSY, timeout=1)
|
||||
# except RuntimeWarning as ex:
|
||||
# logger.error(f"{ex} --- trying start again.")
|
||||
# self.osc.kickoff()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -1,3 +1,2 @@
|
||||
from .mx_measurements import (MeasureFastOmega, MeasureStandardWedge, MeasureVerticalLine,
|
||||
MeasureScanSlits, MeasureRasterSimple, MeasureScreening, MeasureRepeatedOscillation,
|
||||
MeasureMSOX, MeasureGrid)
|
||||
from .mx_measurements import (MeasureStandardWedge, MeasureVerticalLine,
|
||||
MeasureRasterSimple, MeasureScreening)
|
||||
|
||||
@@ -11,9 +11,6 @@ from bec_server.scan_server.scans import AsyncFlyScanBase, ScanArgType, ScanBase
|
||||
logger = bec_logger.logger
|
||||
|
||||
|
||||
FULL_PERIOD = 0
|
||||
HALF_PERIOD = 1
|
||||
|
||||
|
||||
class AbrCmd:
|
||||
""" Valid Aerotech ABR scan commands from the AeroBasic files"""
|
||||
@@ -22,23 +19,23 @@ class AbrCmd:
|
||||
MEASURE_STANDARD = 2
|
||||
VERTICAL_LINE_SCAN = 3
|
||||
SCREENING = 4
|
||||
SUPER_FAST_OMEGA = 5
|
||||
STILL_WEDGE = 6 # NOTE: Unused
|
||||
STILLS = 7 # NOTE: Unused
|
||||
REPEAT_SINGLE_OSCILLATION = 8 # NOTE: Unused
|
||||
SINGLE_OSCILLATION = 9
|
||||
OLD_FASHIONED = 10 # NOTE: Unused
|
||||
RASTER_SCAN = 11
|
||||
JET_ROTATION = 12 # NOTE: Unused
|
||||
X_HELICAL = 13 # NOTE: Unused
|
||||
X_RUNSEQ = 14 # NOTE: Unused
|
||||
JUNGFRAU = 15
|
||||
MSOX = 16 # NOTE: Unused
|
||||
SLIT_SCAN = 17 # NOTE: Unused
|
||||
RASTER_SCAN_STILL = 18
|
||||
SCAN_SASTT = 19
|
||||
SCAN_SASTT_V2 = 20
|
||||
SCAN_SASTT_V3 = 21
|
||||
# SUPER_FAST_OMEGA = 5 # Some Japanese wanted to measure samples in capilaries at high RPMs
|
||||
# STILL_WEDGE = 6 # NOTE: Unused Step scan
|
||||
# STILLS = 7 # NOTE: Unused Just send triggers to detector
|
||||
# REPEAT_SINGLE_OSCILLATION = 8 # NOTE: Unused
|
||||
# SINGLE_OSCILLATION = 9
|
||||
# OLD_FASHIONED = 10 # NOTE: Unused
|
||||
# RASTER_SCAN = 11
|
||||
# JET_ROTATION = 12 # NOTE: Unused
|
||||
# X_HELICAL = 13 # NOTE: Unused
|
||||
# X_RUNSEQ = 14 # NOTE: Unused
|
||||
# JUNGFRAU = 15
|
||||
# MSOX = 16 # NOTE: Unused
|
||||
# SLIT_SCAN = 17 # NOTE: Unused
|
||||
# RASTER_SCAN_STILL = 18
|
||||
# SCAN_SASTT = 19
|
||||
# SCAN_SASTT_V2 = 20
|
||||
# SCAN_SASTT_V3 = 21
|
||||
|
||||
|
||||
|
||||
@@ -59,13 +56,12 @@ class AerotechFlyscanBase(AsyncFlyScanBase):
|
||||
abr_complete : bool
|
||||
Wait for the launched ABR task to complete.
|
||||
"""
|
||||
scan_type = "measure"
|
||||
scan_report_hint = "table"
|
||||
arg_input = {}
|
||||
arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None}
|
||||
|
||||
# Aerotech stage config
|
||||
abr_command = None
|
||||
abr_globals = {}
|
||||
abr_raster_reset =False
|
||||
abr_complete = False
|
||||
abr_timeout = None
|
||||
@@ -77,12 +73,6 @@ class AerotechFlyscanBase(AsyncFlyScanBase):
|
||||
parameters.
|
||||
"""
|
||||
super().__init__(parameter=parameter, **kwargs)
|
||||
|
||||
# Override if explicitly passed as parameter
|
||||
if "abr_command" in self.caller_kwargs:
|
||||
self.abr_command = self.caller_kwargs.get("abr_command")
|
||||
if "abr_globals" in self.caller_kwargs:
|
||||
self.abr_globals = self.caller_kwargs.get("abr_globals")
|
||||
if "abr_raster_reset" in self.caller_kwargs:
|
||||
self.abr_raster_reset = self.caller_kwargs.get("abr_raster_reset")
|
||||
if "abr_complete" in self.caller_kwargs:
|
||||
@@ -91,6 +81,7 @@ class AerotechFlyscanBase(AsyncFlyScanBase):
|
||||
self.abr_timeout = self.caller_kwargs.get("abr_timeout")
|
||||
|
||||
def pre_scan(self):
|
||||
""" Mostly just checking if ABR stage is ok..."""
|
||||
# TODO: Move roughly to start position???
|
||||
|
||||
# ABR status checking
|
||||
@@ -107,13 +98,6 @@ class AerotechFlyscanBase(AsyncFlyScanBase):
|
||||
if self.abr_raster_reset:
|
||||
yield from self.stubs.send_rpc_and_wait("abr", "raster_scan_done.set", 0)
|
||||
|
||||
# Configure the global variables
|
||||
d = {"scan_command" : self.abr_command}
|
||||
d.update(self.abr_globals)
|
||||
logger.info(d)
|
||||
# Configure
|
||||
yield from self.stubs.send_rpc_and_wait("abr", "configure", d)
|
||||
|
||||
# Call super
|
||||
yield from super().pre_scan()
|
||||
|
||||
@@ -125,7 +109,7 @@ class AerotechFlyscanBase(AsyncFlyScanBase):
|
||||
""" The actual scan logic comes here.
|
||||
"""
|
||||
# Kick off the run
|
||||
yield from self.stubs.send_rpc_and_wait("abr", "start_command.set", 1)
|
||||
yield from self.stubs.send_rpc_and_wait("abr", "bluekickoff")
|
||||
logger.info("Measurement launched on the ABR stage...")
|
||||
|
||||
# Wait for grid scanner to finish
|
||||
@@ -168,28 +152,9 @@ class MeasureStandardWedge(AerotechFlyscanBase):
|
||||
ready_rate : float, optional
|
||||
No clue what is this... (default=500)
|
||||
"""
|
||||
scan_name = "standard_wedge"
|
||||
scan_name = "standardscan"
|
||||
required_kwargs = ["start", "range", "move_time"]
|
||||
|
||||
def __init__(self, *args, parameter: dict = None, **kwargs):
|
||||
self.axis = ['abr.omega']
|
||||
# Set parameters from meaningful inputs
|
||||
self.scan_start = parameter['kwargs'].get("start")
|
||||
self.scan_range = parameter['kwargs'].get("range")
|
||||
self.scan_move_time = parameter['kwargs'].get("move_time")
|
||||
self.scan_ready_rate = parameter['kwargs'].get("ready_rate", 500)
|
||||
self.abr_command = AbrCmd.MEASURE_STANDARD
|
||||
self.abr_globals = {
|
||||
"var_1" : self.scan_start,
|
||||
"var_2" : self.scan_range,
|
||||
"var_3" : self.scan_move_time,
|
||||
"var_4" : self.scan_ready_rate,
|
||||
}
|
||||
logger.info(self.abr_globals)
|
||||
# Call base class
|
||||
super().__init__(parameter=parameter, **kwargs)
|
||||
|
||||
|
||||
|
||||
|
||||
class MeasureVerticalLine(AerotechFlyscanBase):
|
||||
@@ -208,75 +173,15 @@ class MeasureVerticalLine(AerotechFlyscanBase):
|
||||
|
||||
Parameters
|
||||
----------
|
||||
range_y : float
|
||||
range : float
|
||||
Step size [mm].
|
||||
steps_y : int
|
||||
steps : int
|
||||
Scan range of the axis.
|
||||
exp_time : float
|
||||
Eeffective exposure time per step [s]
|
||||
"""
|
||||
scan_name = "vertical_line"
|
||||
required_kwargs = ["exp_time", "range_y", "steps_y"]
|
||||
|
||||
def __init__(self, *args, parameter: dict = None, **kwargs):
|
||||
self.axis = ['abr.gmy']
|
||||
# Set parameters from meaningful inputs
|
||||
self.scan_exp_time = parameter['kwargs'].get("exp_time")
|
||||
self.scan_range_y = parameter['kwargs'].get("range_y")
|
||||
self.scan_stepnum_y = parameter['kwargs'].get("steps_y")
|
||||
self.scan_stepsize_y = self.scan_range_y / self.scan_stepnum_y
|
||||
|
||||
self.abr_command = AbrCmd.VERTICAL_LINE_SCAN
|
||||
self.abr_globals = {
|
||||
'var_1': self.scan_exp_time,
|
||||
'var_2': self.scan_stepsize_y,
|
||||
'var_3': self.scan_stepnum_y,
|
||||
}
|
||||
# Call base class
|
||||
super().__init__(parameter=parameter, **kwargs)
|
||||
|
||||
|
||||
|
||||
class MeasureScanSlits(AerotechFlyscanBase):
|
||||
""" Coordinated scan
|
||||
|
||||
Measure a hardware coordinated relative line scan with the X- and Y axes.
|
||||
This is used to accurately track solid supports with long linear grooves,
|
||||
as theese might be slightly rotated.
|
||||
|
||||
The scan itself is executed by the scan service running on the Aerotech
|
||||
controller. Ophyd just configures, launches it and waits for completion.
|
||||
|
||||
Example
|
||||
-------
|
||||
>>> scans.slit_scan(range=[10, 0.3], scan_time=20)
|
||||
|
||||
Parameters
|
||||
----------
|
||||
range_x : float
|
||||
Move distance in X [mm].
|
||||
range_y : float
|
||||
Move distance in Y [mm].
|
||||
velocity : float
|
||||
Effective movement velocity [mm/s].
|
||||
"""
|
||||
scan_name = "slit_scan"
|
||||
required_kwargs = ["range", "velocity"]
|
||||
|
||||
def __init__(self, *args, parameter: dict = None, **kwargs):
|
||||
self.axis = ["abr.gmx", "abr.gmy"]
|
||||
# Set parameters from meaningful inputs
|
||||
self.abr_command = AbrCmd.SLIT_SCAN
|
||||
self.scan_range_x = parameter['kwargs'].get("range_x")
|
||||
self.scan_range_y = parameter['kwargs'].get("range_y")
|
||||
self.scan_velocity = parameter['kwargs'].get("velocity")
|
||||
self.abr_globals = {
|
||||
"var_1" : self.scan_range_x,
|
||||
"var_2" : self.scan_range_y,
|
||||
"var_3" : self.scan_velocity,
|
||||
}
|
||||
# Call base class
|
||||
super().__init__(parameter=parameter, **kwargs)
|
||||
scan_name = "vlinescan"
|
||||
required_kwargs = ["exp_time", "range", "steps"]
|
||||
|
||||
|
||||
|
||||
@@ -308,120 +213,9 @@ class MeasureRasterSimple(AerotechFlyscanBase):
|
||||
steps_y : int
|
||||
Number of scan steps in Y (slow).
|
||||
"""
|
||||
scan_name = "raster_simple"
|
||||
scan_name = "rasterscan"
|
||||
required_kwargs = ["exp_time", "range_x", "range_y", "steps_x", "steps_y"]
|
||||
|
||||
def __init__(self, *args, parameter: dict = None, **kwargs):
|
||||
self.axis = ['abr.gmx', 'abr.gmy']
|
||||
# Set parameters from meaningful inputs
|
||||
self.scan_exp_time = parameter['kwargs'].get("exp_time")
|
||||
self.scan_range_x = parameter['kwargs'].get("range_x")
|
||||
self.scan_range_y = parameter['kwargs'].get("range_y")
|
||||
self.scan_stepnum_x = parameter['kwargs'].get("steps_x")
|
||||
self.scan_stepnum_y = parameter['kwargs'].get("steps_y")
|
||||
self.scan_stepsize_x = self.scan_range_x / self.scan_stepnum_x
|
||||
self.scan_stepsize_y = self.scan_range_y / self.scan_stepnum_y
|
||||
|
||||
self.abr_command = AbrCmd.RASTER_SCAN_SIMPLE
|
||||
self.abr_raster_reset = True
|
||||
self.abr_globals = {
|
||||
"var_1" : self.scan_exp_time,
|
||||
"var_2" : self.scan_stepsize_x,
|
||||
"var_3" : self.scan_stepsize_y,
|
||||
"var_4" : self.scan_stepnum_x,
|
||||
"var_5" : self.scan_stepnum_y,
|
||||
"var_6" : 0,
|
||||
"var_7" : 0,
|
||||
"var_8" : 0,
|
||||
}
|
||||
# Call base class
|
||||
super().__init__(parameter=parameter, **kwargs)
|
||||
|
||||
|
||||
|
||||
class MeasureSASTT(AerotechFlyscanBase):
|
||||
""" Small angle scanning tensor tomography scan
|
||||
|
||||
Measure a single projection for Small Angle Scanning Tensor Tomography.
|
||||
It's essentially a separate grid scan, that can be modified independently.
|
||||
The scan is relative and assumes the goniometer is currently at the CENTER
|
||||
of the first cell (i.e. TOP-LEFT). Each line is executed as a single
|
||||
continous movement, there's no actual stepping in the X direction.
|
||||
|
||||
NOTE: Not all beamlines have all SASTT modes implemented.
|
||||
|
||||
The scan itself is executed by the scan service running on the Aerotech
|
||||
controller. Ophyd just configures, launches it and waits for completion.
|
||||
|
||||
Example
|
||||
-------
|
||||
>>> scans.measure_sastt(exp_time=0.01, range_x=1, range_y=1, steps_x=200, steps_y=200)
|
||||
|
||||
Parameters
|
||||
----------
|
||||
exp_time : float
|
||||
Effective exposure time for each cell along the X axis [s].
|
||||
range_x : float
|
||||
Scan step size [mm].
|
||||
range_y : float
|
||||
Scan step size [mm].
|
||||
steps_x : int
|
||||
Number of scan steps in X (fast). Only used for velocity calculation.
|
||||
steps_y : int
|
||||
Number of scan steps in Y (slow).
|
||||
odd_tweak : (float)
|
||||
Distance to be added before the open shutter command [mm].
|
||||
Used only in scan version=3. Should be small (default: 0)!
|
||||
even_tweak : (float)
|
||||
Distance to be added before the open shutter command [mm].
|
||||
Used only in scan version=3. Should be small (default: 0)!
|
||||
version : (int)
|
||||
Scanning mode for tensor tomo.
|
||||
1 = original snake scan, single PSO window
|
||||
2 = scan always from LEFT---RIGHT
|
||||
3 = snake scan alternating PSO window for even/odd rows
|
||||
"""
|
||||
scan_name = "measure_sastt"
|
||||
required_kwargs = ["exp_time", "range_x", "range_y", "steps_x", "steps_y"]
|
||||
|
||||
def __init__(self, *args, parameter: dict = None, **kwargs):
|
||||
self.axis = ['abr.gmx', 'abr.gmy']
|
||||
# Set parameters from meaningful inputs
|
||||
self.scan_version = int(parameter['kwargs'].get("version", 1))
|
||||
if self.scan_version==1:
|
||||
self.abr_command = AbrCmd.SCAN_SASTT
|
||||
if self.scan_version==2:
|
||||
self.abr_command = AbrCmd.SCAN_SASTT_V2
|
||||
if self.scan_version==3:
|
||||
self.abr_command = AbrCmd.SCAN_SASTT_V3
|
||||
else:
|
||||
raise RuntimeError(f"Non existing SAS-TT scan mode: {self.scan_version}")
|
||||
|
||||
# Set parameters from meaningful inputs
|
||||
self.scan_exp_time = parameter['kwargs'].get("exp_time")
|
||||
self.scan_range_x = parameter['kwargs'].get("range_x")
|
||||
self.scan_range_y = parameter['kwargs'].get("range_y")
|
||||
self.scan_stepnum_x = parameter['kwargs'].get("steps_x")
|
||||
self.scan_stepnum_y = parameter['kwargs'].get("steps_y")
|
||||
self.scan_even_tweak = parameter['kwargs'].get("even_tweak", 0)
|
||||
self.scan_odd_tweak = parameter['kwargs'].get("odd_tweak", 0)
|
||||
self.scan_stepsize_x = self.scan_range_x / self.scan_stepnum_x
|
||||
self.scan_stepsize_y = self.scan_range_y / self.scan_stepnum_y
|
||||
|
||||
self.abr_globals = {
|
||||
'var_1': self.scan_exp_time,
|
||||
'var_2': self.scan_stepsize_x,
|
||||
'var_3': self.scan_stepsize_y,
|
||||
'var_4': self.scan_stepnum_x,
|
||||
'var_5': self.scan_stepnum_y,
|
||||
'var_6': self.scan_even_tweak,
|
||||
'var_7': self.scan_odd_tweak,
|
||||
'var_8': 0,
|
||||
}
|
||||
# Call base class
|
||||
super().__init__(parameter=parameter, **kwargs)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -454,588 +248,5 @@ class MeasureScreening(AerotechFlyscanBase):
|
||||
delta : float
|
||||
Safety margin for sub-range movements (default: 0.5) [deg].
|
||||
"""
|
||||
scan_name = "measure_screening"
|
||||
scan_name = "screeningscan"
|
||||
required_kwargs = ["start", "range", "steps", "exp_time", "oscrange"]
|
||||
|
||||
def __init__(self, *args, parameter: dict = None, **kwargs):
|
||||
self.axis = ['gmy']
|
||||
# Set parameters from meaningful inputs
|
||||
self.scan_start = parameter['kwargs'].get("start")
|
||||
self.scan_range_o = parameter['kwargs'].get("range")
|
||||
self.scan_stepnum_o = parameter['kwargs'].get("steps")
|
||||
self.scan_exp_time = parameter['kwargs'].get("exp_time")
|
||||
self.scan_oscrange = parameter['kwargs'].get("oscrange")
|
||||
self.scan_delta = parameter['kwargs'].get("delta", 0.5)
|
||||
self.scan_stepsize_o = self.scan_range_o / self.scan_stepnum_o
|
||||
self.abr_command = AbrCmd.SCREENING
|
||||
self.abr_globals = {
|
||||
"var_1" : self.scan_start,
|
||||
"var_2" : self.scan_oscrange,
|
||||
"var_3" : self.scan_exp_time,
|
||||
"var_4" : self.scan_stepsize_o,
|
||||
"var_5" : self.scan_stepnum_o,
|
||||
"var_6" : self.scan_delta,
|
||||
}
|
||||
# Call base class
|
||||
super().__init__(parameter=parameter, **kwargs)
|
||||
|
||||
|
||||
|
||||
class MeasureFastOmega(AerotechFlyscanBase):
|
||||
""" Fast omega scan
|
||||
|
||||
Performs a fast rotational scan with the rotation stage (omega). It ramps
|
||||
up to constant speed and sets off PSO for the expected travel time. I.e.
|
||||
not really a PSO output (as it ignores acceleration distance) but it's
|
||||
expected to trigger at-speed for 'range' distance.
|
||||
|
||||
The scan itself is executed by the scan service running on the Aerotech
|
||||
controller. Ophyd just configures, launches it and waits for completion.
|
||||
|
||||
Example
|
||||
-------
|
||||
>>> scans.measure_fast_omega(start=42, range=180, move_time=1.0)
|
||||
|
||||
Parameters
|
||||
----------
|
||||
start : float
|
||||
Scan start position of the axis [deg].
|
||||
range : float
|
||||
At-speed range of the axis relative to 'start' + acceleration
|
||||
distance [deg].
|
||||
move_time : float
|
||||
Total time for the at-speed movement and trigger [s].
|
||||
rate : (???)
|
||||
(default: 200).
|
||||
"""
|
||||
scan_name = "measure_fast_omega"
|
||||
required_kwargs = ["start", "range", "move_time"]
|
||||
|
||||
def __init__(self, *args, parameter: dict = None, **kwargs):
|
||||
self.axis = ['omega']
|
||||
# Set parameters from meaningful inputs
|
||||
logger.info(parameter)
|
||||
self.scan_start = parameter['kwargs'].get("start")
|
||||
self.scan_range = parameter['kwargs'].get("range")
|
||||
self.scan_move_time = parameter['kwargs'].get("move_time")
|
||||
self.scan_acceleration = parameter['kwargs'].get("acceleration", 200)
|
||||
self.abr_command = AbrCmd.SUPER_FAST_OMEGA
|
||||
self.abr_globals = {
|
||||
"var_1" : self.scan_start,
|
||||
"var_2" : self.scan_range,
|
||||
"var_3" : self.scan_move_time,
|
||||
"var_4" : self.scan_acceleration,
|
||||
}
|
||||
logger.info(self.abr_globals)
|
||||
# Call base class
|
||||
super().__init__(parameter=parameter, **kwargs)
|
||||
|
||||
|
||||
|
||||
class MeasureStillWedge(AerotechFlyscanBase):
|
||||
""" Still wedge scan
|
||||
|
||||
Measure a simple step scan with the omega stage with PSO triggering.
|
||||
The scan is similar to the continous wedge scan.
|
||||
|
||||
The scan itself is executed by the scan service running on the Aerotech
|
||||
controller. Ophyd just configures, launches it and waits for completion.
|
||||
|
||||
Example
|
||||
-------
|
||||
>>> scans.measure_still_wedge(start=42, range=180, exp_time=0.1, steps=60)
|
||||
|
||||
Parameters
|
||||
----------
|
||||
start : (float)
|
||||
Scan start position of the axis [mm].
|
||||
range : (float)
|
||||
Scan range of the omega axis [mm].
|
||||
exp_time : (float)
|
||||
Exposure time per point [s].
|
||||
steps : int
|
||||
Number of actual steps.
|
||||
sleep_after_shutter_close : (float)
|
||||
[s] (default: 0.01).
|
||||
"""
|
||||
scan_name = "measure_still_wedge"
|
||||
required_kwargs = ["start", "range", "exp_time", "steps"]
|
||||
|
||||
def __init__(self, *args, parameter: dict = None, **kwargs):
|
||||
self.axis = ['abr.omega']
|
||||
# Set parameters from meaningful inputs
|
||||
self.scan_start = parameter['kwargs'].get("start")
|
||||
self.scan_range = parameter['kwargs'].get("range")
|
||||
self.scan_stepnum_o = parameter['kwargs'].get("steps")
|
||||
self.scan_exp_time = parameter['kwargs'].get("exp_time")
|
||||
self.scan_shutter_sleep = parameter['kwargs'].get("sleep_after_shutter_close", 0.01)
|
||||
self.scan_stepsize_o = self.scan_stepnum_o / self.scan_stepnum_o
|
||||
self.abr_command = AbrCmd.STILL_WEDGE
|
||||
self.abr_globals = {
|
||||
"var_1" : self.scan_start,
|
||||
"var_2" : self.scan_range,
|
||||
"var_3" : self.scan_exp_time,
|
||||
"var_4" : self.scan_stepsize_o,
|
||||
"var_5" : self.scan_shutter_sleep,
|
||||
}
|
||||
# Call base class
|
||||
super().__init__(parameter=parameter, **kwargs)
|
||||
|
||||
|
||||
|
||||
class MeasureStills(AerotechFlyscanBase):
|
||||
""" Still image sequence scan
|
||||
|
||||
Measures a series of PSO triggered images without any motor movement.
|
||||
NOTE: Use idle_time=0.0 to prevent shutter open/close.
|
||||
|
||||
The scan itself is executed by the scan service running on the Aerotech
|
||||
controller. Ophyd just configures, launches it and waits for completion.
|
||||
|
||||
Example
|
||||
-------
|
||||
>>> scans.measure_stills(steps=100, exp_time=0.1, idle_time=3)
|
||||
|
||||
Parameters
|
||||
----------
|
||||
steps : int
|
||||
Total number of frames to be recorded.
|
||||
exp_time : float
|
||||
Exposure time of each frame [s].
|
||||
idle_time : float
|
||||
Sleep time between the frames [s].
|
||||
"""
|
||||
scan_name = "measure_stills"
|
||||
required_kwargs = ["exp_time", "steps", "idle_time"]
|
||||
|
||||
def __init__(self, *args, parameter: dict = None, **kwargs):
|
||||
self.axis = []
|
||||
# Set parameters from meaningful inputs
|
||||
self.scan_exp_time = parameter['kwargs'].get("exp_time")
|
||||
self.scan_numsteps = parameter['kwargs'].get("steps")
|
||||
self.scan_idle_time = parameter['kwargs'].get("idle_time")
|
||||
self.abr_command = AbrCmd.STILLS
|
||||
self.abr_globals = {
|
||||
"var_1" : self.scan_numsteps,
|
||||
"var_2" : self.scan_exp_time,
|
||||
"var_3" : self.scan_idle_time,
|
||||
}
|
||||
# Call base class
|
||||
super().__init__(parameter=parameter, **kwargs)
|
||||
|
||||
|
||||
|
||||
class MeasureSingleOscillation(AerotechFlyscanBase):
|
||||
""" Single oscillation scan
|
||||
|
||||
Short line scan with the omega axis with PSO trigger.
|
||||
|
||||
The scan itself is executed by the scan service running on the Aerotech
|
||||
controller. Ophyd just configures, launches it and waits for completion.
|
||||
|
||||
Example
|
||||
-------
|
||||
>>> scans.measure_single_osc(start=42, range=180, move_time=20)
|
||||
|
||||
Parameters
|
||||
----------
|
||||
start : float
|
||||
Scan start position of the omegaaxis [mm].
|
||||
range : float
|
||||
Oscillation range of the axis [mm].
|
||||
move_time : float
|
||||
Total scan time for the movement [s].
|
||||
delta : (???)
|
||||
Safety margin for movement (default: 0).
|
||||
settle : (???)
|
||||
(default: 0.5).
|
||||
"""
|
||||
scan_name = "measure_single_osc"
|
||||
required_kwargs = ["start", "range", "move_time"]
|
||||
|
||||
def __init__(self, *args, parameter: dict = None, **kwargs):
|
||||
self.axis = ['abr.omega']
|
||||
# Set parameters from meaningful inputs
|
||||
self.scan_start = parameter['kwargs'].get("start")
|
||||
self.scan_range = parameter['kwargs'].get("range")
|
||||
self.scan_move_time = parameter['kwargs'].get("move_time")
|
||||
self.scan_delta = parameter['kwargs'].get("delta", 0)
|
||||
self.scan_settle = parameter['kwargs'].get("settle", 0.5)
|
||||
self.abr_command = AbrCmd.SINGLE_OSCILLATION
|
||||
self.abr_globals = {
|
||||
"var_1" : self.scan_start,
|
||||
"var_2" : self.scan_range,
|
||||
"var_3" : self.scan_move_time,
|
||||
"var_4" : self.scan_delta,
|
||||
"var_5" : self.scan_settle,
|
||||
}
|
||||
# Call base class
|
||||
super().__init__(parameter=parameter, **kwargs)
|
||||
|
||||
|
||||
|
||||
class MeasureRepeatedOscillation(AerotechFlyscanBase):
|
||||
""" Repeated oscillation scan
|
||||
|
||||
Repeated relative line scans with the omega axis with PSO enable trigger.
|
||||
The lines are performed in zigzag mode with PSO triggering.
|
||||
|
||||
The scan itself is executed by the scan service running on the Aerotech
|
||||
controller. Ophyd just configures, launches it and waits for completion.
|
||||
|
||||
Example
|
||||
-------
|
||||
>>> scans.measure_multi_osc(steps=50, range=30, move_time=3)
|
||||
|
||||
Parameters
|
||||
----------
|
||||
steps : int
|
||||
Number of cycles to repeat the scan
|
||||
move_time : float
|
||||
Total scan time for the movement [s].
|
||||
range : float
|
||||
Relative oscillation range of the omega axis [deg].
|
||||
settle : (???)
|
||||
Movement settle time after each line (default: 0) [s].
|
||||
delta : (???)
|
||||
Safety margin for movement (default: 0.5) [deg].
|
||||
"""
|
||||
scan_name = "measure_multi_osc"
|
||||
required_kwargs = ["steps", "range", "scan_time"]
|
||||
|
||||
def __init__(self, *args, parameter: dict = None, **kwargs):
|
||||
self.axis = ['omega']
|
||||
# Set parameters from meaningful inputs
|
||||
self.scan_stepnum = parameter['kwargs'].get("steps")
|
||||
self.scan_range = parameter['kwargs'].get("range")
|
||||
self.scan_move_time = parameter['kwargs'].get("move_time")
|
||||
self.scan_sleep = parameter['kwargs'].get("sleep", 0)
|
||||
self.scan_delta = parameter['kwargs'].get("delta", 0.5)
|
||||
self.abr_command = AbrCmd.REPEAT_SINGLE_OSCILLATION
|
||||
self.abr_globals = {
|
||||
"var_1" : self.scan_stepnum,
|
||||
"var_2" : self.scan_move_time,
|
||||
"var_3" : self.scan_range,
|
||||
"var_4" : self.scan_sleep,
|
||||
"var_5" : self.scan_delta,
|
||||
}
|
||||
# Call base class
|
||||
super().__init__(parameter=parameter, **kwargs)
|
||||
|
||||
|
||||
|
||||
class MeasureMSOX(AerotechFlyscanBase):
|
||||
""" Standard MSOX scan
|
||||
|
||||
MSOX experiment for Florian, i.e. a glorified oscillation scan this time
|
||||
with absolute positions. The lines are unidirectional with PSO triggering.
|
||||
|
||||
The scan itself is executed by the scan service running on the Aerotech
|
||||
controller. Ophyd just configures, launches it and waits for completion.
|
||||
|
||||
Example
|
||||
-------
|
||||
>>> scans.measure_msox(start=42, range=30, move_time=3.0)
|
||||
|
||||
Parameters
|
||||
----------
|
||||
start : float
|
||||
Scan start position of the axis.
|
||||
range : float
|
||||
Scan range of the axis.
|
||||
move_time : float
|
||||
Exposure time for each point [s].
|
||||
steps : int
|
||||
Number of repeats.
|
||||
settle_time : float
|
||||
Settle time before line start (default=0) [s].
|
||||
"""
|
||||
scan_name = "measure_msox"
|
||||
required_kwargs = ["start", "range", "move_time"]
|
||||
|
||||
def __init__(self, *args, parameter: dict = None, **kwargs):
|
||||
self.axis = ['abr.omega']
|
||||
# Set parameters from meaningful inputs
|
||||
self.scan_start = parameter['kwargs'].get("start")
|
||||
self.scan_range = parameter['kwargs'].get("range")
|
||||
self.scan_move_time = parameter['kwargs'].get("move_time")
|
||||
self.scan_stepnum = parameter['kwargs'].get("steps", 1)
|
||||
self.scan_settle_time = parameter['kwargs'].get("settle_time", 0)
|
||||
self.abr_command = AbrCmd.MSOX
|
||||
self.abr_globals = {
|
||||
"var_1" : self.scan_start,
|
||||
"var_2" : self.scan_range,
|
||||
"var_3" : self.scan_move_time,
|
||||
"var_4" : self.scan_stepnum,
|
||||
"var_5" : self.scan_settle_time,
|
||||
"var_6" : 0,
|
||||
"var_7" : 0,
|
||||
"var_8" : 0,
|
||||
"var_9" : 0,
|
||||
"var_10" : 0,
|
||||
}
|
||||
# Call base class
|
||||
super().__init__(parameter=parameter, **kwargs)
|
||||
|
||||
|
||||
|
||||
class MeasureGrid(AerotechFlyscanBase):
|
||||
""" General grid scan
|
||||
|
||||
Performs an X-Y-Omega coordinated grid scan:
|
||||
X axis is absolute (fast axis)
|
||||
Y axis is relative to start
|
||||
Omega starts at the current value and is synchronized to X
|
||||
|
||||
Note: This was probably never used in its current form, because the
|
||||
code had an undefined variable 'self.position'
|
||||
|
||||
The scan itself is executed by the scan service running on the Aerotech
|
||||
controller. Ophyd just configures, launches it and waits for completion.
|
||||
|
||||
Example
|
||||
-------
|
||||
>>> scans.measure_raster(start=42, range=10, exp_time=0.1)
|
||||
|
||||
Parameters
|
||||
----------
|
||||
positions : 2D-array
|
||||
Scan position of each axis, i.e. (N, 3) shaped array.
|
||||
steps_x : int
|
||||
Number of points along the X axis (fast).
|
||||
steps_y : int
|
||||
Number of points in the Y axis (slow).
|
||||
angle:
|
||||
Triggered angular range on the synchronized omega [deg]
|
||||
exp_time : (float)
|
||||
Exposure time for each point, only used for number of points and velocity [s].
|
||||
"""
|
||||
scan_name = "measure_raster"
|
||||
required_kwargs = ["positions", "ncols", "angle", "exp_time"]
|
||||
|
||||
# Default values
|
||||
scan_start_x = None
|
||||
scan_end_x = None
|
||||
scan_xstep = 0.010
|
||||
scan_ystep = 0.010
|
||||
scan_zstep = 0.010
|
||||
scan_gmy_step = 0.010
|
||||
|
||||
def __init__(self, *args, parameter: dict = None, **kwargs):
|
||||
self.axis = ['abr.omega', 'abr.gmx', 'abr.gmy']
|
||||
# Set parameters from meaningful inputs
|
||||
self.abr_command = AbrCmd.RASTER_SCAN
|
||||
self.scan_positions = parameter['kwargs'].get("positions")
|
||||
self.scan_stepnum_x = parameter['kwargs'].get("steps_x")
|
||||
self.scan_stepnum_y = parameter['kwargs'].get("steps_y")
|
||||
self.scan_anglerange = parameter['kwargs'].get("angle")
|
||||
self.scan_exp_time = parameter['kwargs'].get("exp_time")
|
||||
|
||||
if len(self.scan_positions) != self.scan_stepnum_x * self.scan_stepnum_y:
|
||||
raise RuntimeError("Number of points and positions do not match")
|
||||
|
||||
if len(self.scan_positions)==1:
|
||||
raise RuntimeWarning("Raster scan with one cell makes no sense")
|
||||
|
||||
# Call base class
|
||||
super().__init__(parameter=parameter, **kwargs)
|
||||
|
||||
def prepare_positions(self):
|
||||
# Calculate step sizes
|
||||
pos_start = self.scan_positions[0] # first cell on first row
|
||||
pos_col_2nd = self.scan_positions[1] # second cell on first row
|
||||
pos_col_end = self.scan_positions[self.scan_stepnum_x-1] # last cell on first row
|
||||
|
||||
self.scan_xstep = pos_col_2nd[0] - pos_start[0]
|
||||
half_cell = abs(self.scan_xstep) / 2.0
|
||||
self.scan_start_x = pos_start[0] - half_cell
|
||||
self.scan_end_x = pos_col_end[0] + half_cell
|
||||
|
||||
if self.scan_stepnum_y > 1:
|
||||
pos_row_2nd = self.scan_positions[self.scan_stepnum_x] # first cell on second row
|
||||
self.scan_ystep = pos_row_2nd[1] - pos_start[1]
|
||||
self.scan_zstep = pos_row_2nd[2] - pos_start[2]
|
||||
self.scan_gmy_step = np.linalg.norm([self.scan_ystep, self.scan_zstep])
|
||||
|
||||
# ToDo: Check with Zac what are theese parameters
|
||||
self.scan_start_o = yield from self.stubs.send_rpc_and_wait("abs", "omega.readback.get")
|
||||
self.scan_gmx_offset = yield from self.stubs.send_rpc_and_wait("abs", "gmx.readback.get")
|
||||
|
||||
# Configure the global variables
|
||||
self.abr_globals = {
|
||||
"var_1" : self.scan_start_o,
|
||||
"var_2" : self.scan_start_x,
|
||||
"var_3" : self.scan_end_x,
|
||||
"var_4" : self.scan_stepnum_x,
|
||||
"var_5" : self.scan_stepnum_y,
|
||||
"var_6" : self.scan_anglerange,
|
||||
"var_7" : self.scan_exp_time,
|
||||
"var_8" : HALF_PERIOD,
|
||||
"var_9" : self.scan_gmx_offset,
|
||||
"var_10" : self.scan_gmy_step,
|
||||
}
|
||||
|
||||
|
||||
|
||||
class MeasureGridStill(AerotechFlyscanBase):
|
||||
""" Still grid scan
|
||||
|
||||
Performs an X-Y-Omega coordinated grid scan in stepping mode:
|
||||
X axis is absolute
|
||||
Y axis is relative to start
|
||||
Omega starts at the current value and is synchronized to X
|
||||
|
||||
Note: This was probably never used in its current form, because the
|
||||
code had an undefined variable 'self.position'
|
||||
|
||||
The scan itself is executed by the scan service running on the Aerotech
|
||||
controller. Ophyd just configures, launches it and waits for completion.
|
||||
|
||||
Example
|
||||
-------
|
||||
>>> pts = [[11, 2, 3.4], [11, 2, 3.5], ...]
|
||||
>>> scans.measure_raster(positions=pts, steps_x=20, steps_y=20, exp_time=0.1, delay=0.05)
|
||||
|
||||
Parameters
|
||||
----------
|
||||
positions : 2D-array
|
||||
Scan position of each axis, i.e. (N, 3) shaped array.
|
||||
columns : int
|
||||
Nmber of columns.
|
||||
angle:
|
||||
???
|
||||
exp_time : float
|
||||
Exposure time for each point [s].
|
||||
delay:
|
||||
???
|
||||
"""
|
||||
scan_name = "measure_raster_still"
|
||||
required_kwargs = ["positions", "steps_x", "steps_y", "exp_time", "delay"]
|
||||
|
||||
# Default values
|
||||
scan_start_x = None
|
||||
scan_end_x = None
|
||||
scan_xstep = 0.010
|
||||
scan_ystep = 0.010
|
||||
scan_zstep = 0.010
|
||||
scan_gmy_step = 0.010
|
||||
|
||||
def __init__(self, *args, parameter: dict = None, **kwargs):
|
||||
self.axis = ['abr.gmx', 'abr.gmy', 'abr.omega']
|
||||
# Set parameters from meaningful inputs
|
||||
self.abr_command = AbrCmd.RASTER_SCAN_STILL
|
||||
self.scan_positions = parameter['kwargs'].get("positions")
|
||||
self.scan_stepnum_x = parameter['kwargs'].get("steps_x")
|
||||
self.scan_stepnum_y = parameter['kwargs'].get("steps_y")
|
||||
self.scan_exp_time = parameter['kwargs'].get("exp_time")
|
||||
self.scan_delay = parameter['kwargs'].get("delay")
|
||||
|
||||
if len(self.scan_positions) != self.scan_stepnum_x * self.scan_stepnum_y:
|
||||
raise RuntimeError("Number of points and positions do not match")
|
||||
|
||||
if len(self.scan_positions)==1:
|
||||
raise RuntimeWarning("Raster scan with one cell makes no sense")
|
||||
# Call base class
|
||||
super().__init__(parameter=parameter, **kwargs)
|
||||
|
||||
def prepare_positions(self):
|
||||
# Calculate step sizes
|
||||
pos_start = self.scan_positions[0] # first cell on first row
|
||||
pos_col_2nd = self.scan_positions[1] # second cell on first row
|
||||
pos_col_end = self.scan_positions[self.scan_stepnum_x-1] # last cell on first row
|
||||
|
||||
self.scan_xstep = pos_col_2nd[0] - pos_start[0]
|
||||
half_cell = abs(self.scan_xstep) / 2.0
|
||||
self.scan_start_x = pos_start[0] - half_cell
|
||||
self.scan_end_x = pos_col_end[0] + half_cell
|
||||
|
||||
if self.scan_stepnum_y > 1:
|
||||
pos_row_2nd = self.scan_positions[self.scan_stepnum_x] # first cell on second row
|
||||
self.scan_ystep = pos_row_2nd[1] - pos_start[1]
|
||||
self.scan_zstep = pos_row_2nd[2] - pos_start[2]
|
||||
self.scan_gmy_step = np.linalg.norm([self.scan_ystep, self.scan_zstep])
|
||||
|
||||
# ToDo: Check with Zac what are theese parameters
|
||||
self.scan_omega_position = yield from self.stubs.send_rpc_and_wait("abs", "omega.readback.get")
|
||||
self.scan_gmx_offset = yield from self.stubs.send_rpc_and_wait("abs", "gmx.readback.get")
|
||||
|
||||
self.abr_globals = {
|
||||
"var_1" : self.scan_omega_position,
|
||||
"var_2" : self.scan_start_x,
|
||||
"var_3" : self.scan_end_x,
|
||||
"var_4" : self.scan_stepnum_x,
|
||||
"var_5" : self.scan_stepnum_y,
|
||||
"var_6" : self.scan_delay,
|
||||
"var_7" : self.scan_exp_time,
|
||||
"var_8" : HALF_PERIOD,
|
||||
"var_9" : self.scan_gmx_offset,
|
||||
"var_10" : self.scan_gmy_step,
|
||||
}
|
||||
|
||||
|
||||
|
||||
class MeasureJungfrau(AerotechFlyscanBase):
|
||||
""" Scan with the Jungfrau detector
|
||||
|
||||
The scan itself is executed by the scan service running on the Aerotech
|
||||
controller. Ophyd just configures, launches it and waits for completion.
|
||||
|
||||
Example
|
||||
-------
|
||||
>>> scans.measure_jungfrau(start=42, range=180, scan_time=20)
|
||||
|
||||
Parameters
|
||||
----------
|
||||
steps_x : int
|
||||
???
|
||||
steps_y : int
|
||||
???
|
||||
range_x : int
|
||||
???
|
||||
range_y : int
|
||||
???
|
||||
exp_time : float
|
||||
Exposure time per point [s].
|
||||
sleep : float
|
||||
(default: 0.1).
|
||||
"""
|
||||
scan_name = "measure_jungfrau"
|
||||
required_kwargs = ["steps_x", "steps_y", "range_x", "range_y", "exp_time"]
|
||||
|
||||
def __init__(self, *args, parameter: dict = None, **kwargs):
|
||||
self.axis = ['omega']
|
||||
# Set parameters from meaningful inputs
|
||||
self.scan_stepnum_x = parameter['kwargs'].get("steps_x")
|
||||
self.scan_stepnum_y = parameter['kwargs'].get("steps_y")
|
||||
self.scan_range_x = parameter['kwargs'].get("range_x")
|
||||
self.scan_range_y = parameter['kwargs'].get("range_y")
|
||||
self.scan_stepsize_x = self.scan_range_x / self.scan_stepnum_x
|
||||
self.scan_stepsize_y = self.scan_range_y / self.scan_stepnum_y
|
||||
self.scan_exp_time = parameter['kwargs'].get("exp_time")
|
||||
self.scan_sleep = parameter['kwargs'].get("sleep", 0.1)
|
||||
sleep_time = 5.0 + (self.scan_stepnum_y * self.scan_sleep) + (
|
||||
self.scan_stepnum_x * self.scan_stepnum_y * self.scan_exp_time)
|
||||
self.abr_timeout = None if self.scan_sleep <= 0 else sleep_time
|
||||
self.abr_command = AbrCmd.JUNGFRAU
|
||||
self.abr_complete = True
|
||||
self.abr_globals = {
|
||||
"var_1" : self.scan_stepnum_x,
|
||||
"var_2" : self.scan_stepnum_y,
|
||||
"var_3" : self.scan_stepsize_x,
|
||||
"var_4" : self.scan_stepsize_y,
|
||||
"var_5" : self.scan_exp_time,
|
||||
"var_6" : self.scan_sleep,
|
||||
"var_7" : 0,
|
||||
"var_8" : 0,
|
||||
}
|
||||
# Call base class
|
||||
super().__init__(parameter=parameter, **kwargs)
|
||||
|
||||
def scan_core(self):
|
||||
""" The actual scan logic comes here.
|
||||
"""
|
||||
yield from super().scan_core()
|
||||
|
||||
# Go back to direct mode
|
||||
st = yield from self.stubs.send_rpc_and_wait("abr", "set_axis_mode", "direct")
|
||||
st.wait()
|
||||
Reference in New Issue
Block a user