Starting to move things to scans

This commit is contained in:
Unknown MX Person
2024-08-27 17:58:33 +02:00
parent e7fd8e453d
commit 13c6d7b8fb
3 changed files with 1007 additions and 150 deletions

View File

@@ -103,16 +103,12 @@ Examples
"""
import time
import math
from beamline import BEAMLINE
from epics import PV, poll
from epics.ca import pend_io
from ophyd import Component, Device, EpicsSignal, EpicsSignalRO, Kind
from ophyd.status import Status
from ophyd.status import MoveStatus, SubscriptionStatus, DeviceStatus
class A3200Axis(PVPositioner):
from aeroA3200 import A3200Axis, A3200RasterScanner, A3200Oscillator
ABR_DONE = 0
@@ -131,17 +127,6 @@ SHUTTER_OPEN = 1
FULL_PERIOD = 0
HALF_PERIOD = 1
def pv_wait(pv, val, timeout=10.0):
timeout = timeout + time.time()
timeisup = False
while val != pv.get() and not timeisup:
poll(0.01)
timeisup = time.time() > timeout
if timeisup:
raise RuntimeWarning("timeout waiting for %s to reach %s" % (pv.pvname, str(val)))
CMD_NONE = 0
CMD_RASTER_SCAN_SIMPLE = 1
CMD_MEASURE_STANDARD = 2
@@ -196,9 +181,9 @@ class AerotechAbrStage(Device):
gmz = Component(A3200Axis, "-ES-DF1:GMZ", kind=Kind.hinted)
_zcmd = Component(EpicsSignal, "-ES-PSO:CMD", put_complete=True, kind=Kind.omitted)
_start_command = Component(EpicsSignal, "-ES-PSO:START-TEST.PROC", put_complete=True, kind=Kind.omitted)
_stop_command = Component(EpicsSignal, "-ES-PSO:STOP-TEST.PROC", put_complete=True, kind=Kind.omitted)
scan_command = Component(EpicsSignal, "-ES-PSO:CMD", put_complete=True, kind=Kind.omitted)
start_command = Component(EpicsSignal, "-ES-PSO:START-TEST.PROC", put_complete=True, kind=Kind.omitted)
stop_command = Component(EpicsSignal, "-ES-PSO:STOP-TEST.PROC", put_complete=True, kind=Kind.omitted)
_var_1 = Component(EpicsSignal, "-ES-PSO:VAR-1", put_complete=True, kind=Kind.omitted)
_var_2 = Component(EpicsSignal, "-ES-PSO:VAR-2", put_complete=True, kind=Kind.omitted)
@@ -216,23 +201,17 @@ class AerotechAbrStage(Device):
raster_num_rows = Component(EpicsSignal, "-ES-GRD:SCAN-DONE", kind=Kind.config)
def __init__(self):
# def __init__(self):
# self.__pv_ostart = PV(f"{BEAMLINE}-ES-OSC:START-POS")
# self.__pv_orange = PV(f"{BEAMLINE}-ES-OSC:RANGE")
# self.__pv_etime = PV(f"{BEAMLINE}-ES-OSC:ETIME")
# self.__pv_get_ready = PV(f"{BEAMLINE}-ES-OSC:READY.PROC")
# self.__pv_start = PV(f"{BEAMLINE}-ES-OSC:START")
# self.__pv_phase = PV(f"{BEAMLINE}-ES-OSC:DONE")
self.__pv_ostart = PV(f"{BEAMLINE}-ES-OSC:START-POS")
self.__pv_orange = PV(f"{BEAMLINE}-ES-OSC:RANGE")
self.__pv_etime = PV(f"{BEAMLINE}-ES-OSC:ETIME")
self.__pv_get_ready = PV(f"{BEAMLINE}-ES-OSC:READY.PROC")
self.__pv_start = PV(f"{BEAMLINE}-ES-OSC:START")
self.__pv_phase = PV(f"{BEAMLINE}-ES-OSC:DONE")
pend_io(10)
# pend_io(10)
def command(self, cmd):
self._zcmd.set(cmd).wait()
def start_command(self):
self._start_command.set(1).wait()
def measure_standard(self, start, wedge, etime, ready_rate=500.0):
"""measure a standard wedge from `start` to `start` + `wedge` during `etime` seconds"""
@@ -555,19 +534,48 @@ class AerotechAbrStage(Device):
self.start_command()
def configure(self, d: dict) -> tuple:
def configure_stills(self, frames, etime, sleep=0.0):
self._var_1.set(frames).wait()
self._var_2.set(etime).wait()
self._var_3.set(sleep).wait()
old = self.read_configuration()
# ToDo: Check if idle before reconfiguring
# Set the corresponding global variables
if "scan_command" in d:
self.scan_command.set(d['scan_command']).wait()
if "var_1" in d:
self._var_1.set(d['var_1']).wait()
if "var_2" in d:
self._var_2.set(d['var_2']).wait()
if "var_3" in d:
self._var_3.set(d['var_3']).wait()
if "var_4" in d:
self._var_4.set(d['var_4']).wait()
if "var_5" in d:
self._var_5.set(d['var_5']).wait()
if "var_6" in d:
self._var_6.set(d['var_6']).wait()
if "var_7" in d:
self._var_7.set(d['var_7']).wait()
if "var_8" in d:
self._var_8.set(d['var_8']).wait()
if "var_9" in d:
self._var_9.set(d['var_9']).wait()
if "var_10" in d:
self._var_10.set(d['var_10']).wait()
new = self.read_configuration()
return old, new
def complete(self, timeout=None) -> DeviceStatus:
""" ToDo: WTF was this device doing here? Whatever...
"""
return self.raster.complete()
def reset(self):
self.command(CMD_NONE)
self.set_direct_mode()
def raster_2d(self):
pass
def is_ioc_ok(self):
return 0 == self.status.get()
@@ -742,78 +750,7 @@ class AerotechAbrStage(Device):
print(" --- trying start again.")
self.osc.taskStart.set(1).wait()
def velo(self, axis: int, speed: float = None) -> float:
"""set's the speed on the given axis to speed"""
if AXIS_GMX == axis:
velo_pv = self.__pv_gmx_velo
min, max = 0.01, 100.0
elif AXIS_GMY == axis:
velo_pv = self.__pv_gmy_velo
min, max = 0.01, 10.0
elif AXIS_GMZ == axis:
velo_pv = self.__pv_gmz_velo
min, max = 0.01, 10.0
elif AXIS_OMEGA == axis:
velo_pv = self.__pv_omega_velo
min, max = 0.01, 720.0
else:
raise Exception("unknown axis index")
if speed is not None:
if not (min <= speed <= max):
raise Exception(f"requested speed {speed} outside range {min} < speed < {max}")
velo_pv.put(speed)
poll(0.05)
return velo_pv.get()
def select_axis(self, axis):
"""returns PVs and limits for the given axis
axis: Aerotech.AXIS_{OMEGA=1, GMX=2, GMY=3, GMZ=4}
return (val_pv, rbv_pv, velo_pv, done_pv, velomin, velomax)
"""
if AXIS_GMX == axis:
val_pv = self.__pv_gmx_val
rbv_pv = self.__pv_gmx_rbv
velo_pv = self.__pv_gmx_velo
done_pv = self.__pv_gmx_done
min, max = 0.01, 100.0
elif AXIS_GMY == axis:
val_pv = self.__pv_gmy_val
rbv_pv = self.__pv_gmy_rbv
velo_pv = self.__pv_gmy_velo
done_pv = self.__pv_gmy_done
min, max = 0.01, 10.0
elif AXIS_GMZ == axis:
val_pv = self.__pv_gmz_val
rbv_pv = self.__pv_gmz_rbv
velo_pv = self.__pv_gmz_velo
done_pv = self.__pv_gmz_done
min, max = 0.01, 10.0
elif AXIS_OMEGA == axis:
val_pv = self.__pv_omega_set
rbv_pv = self.__pv_omega_get
velo_pv = self.__pv_omega_velo
done_pv = self.__pv_omega_done
min, max = 0.01, 720.0
else:
raise Exception("unknown axis index")
return (val_pv, rbv_pv, velo_pv, done_pv, min, max)
def rbv(self):
x = self.__pv_gmx_rbv.get()
y = self.__pv_gmy_get.get()
z = self.__pv_gmz_get.get()
omega = self.__pv_omega_get.get()
return {"gmx": x, "gmy": y, "gmz": z, "omega": omega}
def move(self, position, wait=False, velo=None, direct=False, **kwargs) -> StatusBase:
def move(self, position, wait=False, velocity=None, relative=None, direct=False, **kwargs) -> MoveStatus:
"""Move omega to `angle` at `speed` deg/s and `wait` (or not).
If a `speed` is specified, it will be used for the movement but the speed
@@ -832,10 +769,10 @@ class AerotechAbrStage(Device):
Limits: 0 - 360
`wait` [boolean] wait or not for movement to finish.
"""
return self.omega.move(position, wait=wait, velo=velo, direct=direct, **kwargs)
return self.omega.move(position, wait=wait, velocity=velocity, relative=relative, direct=direct, **kwargs)
def set_shutter(self, state):
if self.__pv_axes_mode.get():
if self.axisAxesMode.get():
print("ABR is not in direct mode; cannot manipulate shutter")
return False
state = str(state).lower()
@@ -846,39 +783,42 @@ class AerotechAbrStage(Device):
state = 1
elif state == ["0", "closed"]:
state = 0
self.__pv_shutter_set.put(state, wait=True)
return state == self.__pv_shutter_get.get()
self._shutter.put(state, wait=True)
return state == self._shutter.get()
if __name__ == "__main__":
import argparse
from time import sleep
aero = AerotechAbrStage(prefix="X06DA-ES-DF1", name="abr")
aero.wait_for_connection()
parser = argparse.ArgumentParser(description="aerotech client")
parser.add_argument("--stop", help="stop/reset/restart aerotech", action="store_true")
parser.add_argument("--sanitize-speeds", help="reset speeds on axis to sane defaults", action="store_true")
parser.add_argument("--measure-standard", help="measure a standard dataset", action="store")
parser.add_argument("--omega", help="move omega to...", action="store")
parser.add_argument("--gmx", help="move gmx to...", action="store")
parser.add_argument("--gmy", help="move gmy to...", action="store")
parser.add_argument("--gmz", help="move gmz to...", action="store")
args = parser.parse_args()
# import argparse
# from time import sleep
abr = Abr()
if args.stop:
abr.stop()
# parser = argparse.ArgumentParser(description="aerotech client")
# parser.add_argument("--stop", help="stop/reset/restart aerotech", action="store_true")
# parser.add_argument("--sanitize-speeds", help="reset speeds on axis to sane defaults", action="store_true")
# parser.add_argument("--measure-standard", help="measure a standard dataset", action="store")
# parser.add_argument("--omega", help="move omega to...", action="store")
# parser.add_argument("--gmx", help="move gmx to...", action="store")
# parser.add_argument("--gmy", help="move gmy to...", action="store")
# parser.add_argument("--gmz", help="move gmz to...", action="store")
# args = parser.parse_args()
# abr = Abr()
# if args.stop:
# abr.stop()
elif args.sanitize_speeds:
abr.velo(AXIS_GMX, 50.0)
abr.velo(AXIS_GMY, 10.0)
abr.velo(AXIS_GMZ, 10.0)
abr.velo(AXIS_OMEGA, 180.0)
# elif args.sanitize_speeds:
# abr.velo(AXIS_GMX, 50.0)
# abr.velo(AXIS_GMY, 10.0)
# abr.velo(AXIS_GMZ, 10.0)
# abr.velo(AXIS_OMEGA, 180.0)
elif args.gmx:
abr.move_x(args.gmx)
# elif args.gmx:
# abr.move_x(args.gmx)
elif args.measure_standard:
start_angle, total_range, total_time = args.measure_standard.split(",")
abr.measure_standard(start_angle, total_range, total_time)
# elif args.measure_standard:
# start_angle, total_range, total_time = args.measure_standard.split(",")
# abr.measure_standard(start_angle, total_range, total_time)

View File

@@ -5,8 +5,19 @@ Created on Tue Jun 11 11:28:38 2024
@author: mohacsi_i
"""
from ophyd import Component, PVPositioner, Device, EpicsSignal, EpicsSignalRO, Kind
from ophyd.status import Status
from ophyd import Component, PVPositioner, Device, Signal, EpicsSignal, EpicsSignalRO, Kind
from ophyd.status import Status, MoveStatus, SubscriptionStatus, DeviceStatus
ABR_DONE = 0
ABR_READY = 1
ABR_BUSY = 2
GRID_SCAN_BUSY = 0
GRID_SCAN_DONE = 1
DIRECT_MODE = 0
MEASURING_MODE = 1
class A3200Axis(PVPositioner):
@@ -21,7 +32,7 @@ class A3200Axis(PVPositioner):
setpoint = Component(EpicsSignal, "-SETP", kind=Kind.config)
#setpoint = Component(EpicsSignal, "-VAL", kind=Kind.config)
velo = Component(EpicsSignal, "-SETV", kind=Kind.config)
velocity = Component(EpicsSignal, "-SETV", kind=Kind.config)
status = Component(EpicsSignalRO, "-STAT", auto_monitor=True, kind=Kind.config)
# PV to issue native relative movements on the A3200
relmove = Component(EpicsSignal, "-INCP", put_complete=True, kind=Kind.config)
@@ -30,9 +41,9 @@ class A3200Axis(PVPositioner):
ishomed = Component(EpicsSignal, "-AS00", kind=Kind.config)
# HW status words
dshw = Component(EpicsSignalRO, "-DSHW", auto_monitor=True, kind=Kind.hinted)
ashw = Component(EpicsSignalRO, "-ASHW", auto_monitor=True, kind=Kind.hinted)
fslw = Component(EpicsSignalRO, "-FSLW", auto_monitor=True, kind=Kind.hinted)
dshw = Component(EpicsSignalRO, "-DSHW", auto_monitor=True, kind=Kind.normal)
ashw = Component(EpicsSignalRO, "-ASHW", auto_monitor=True, kind=Kind.normal)
fslw = Component(EpicsSignalRO, "-FSLW", auto_monitor=True, kind=Kind.normal)
# Rock movement
_rock = Component(EpicsSignal, "-ROCK", put_complete=True, kind=Kind.config)
@@ -56,9 +67,12 @@ class A3200Axis(PVPositioner):
self.vmax.set(vmax).wait()
def move(self, position, wait=True, velo=None, direct=False, **kwargs):
def move(self, position, velocity=None, wait=True, relative=False, direct=False, **kwargs) -> MoveStatus:
""" Native absolute movement on the A3200 """
# ToDo: Ensure we can stop (if a stop_signal is configured).
# Check if we're in direct movement mode
if self.parent is not None:
if self.parent.mode != DIRECT_MODE:
if direct:
@@ -66,8 +80,13 @@ class A3200Axis(PVPositioner):
else:
raise RuntimeError("ABR is not in direct mode!")
if velo is not None:
self.velo.set(velo).wait()
# Set velocity if provided
if velocity is not None:
self.velo.set(velocity).wait()
# Set up relative motion
if relative:
raise NotImplementedError("Relative movement is not yet supported by the ophyd device")
return super().move(position, wait=wait, **kwargs)
@@ -299,6 +318,27 @@ class A3200RasterScanner(Device):
return status
def complete(self, timeout=None) -> DeviceStatus:
if timeout is not None:
# Define wait until the busy flag goes down (excluding initial update)
timestamp_ = 0
def isReady(*args, old_value, value, timestamp, **kwargs):
nonlocal timestamp_
result = False if (timestamp_== 0) else bool(value==GRID_SCAN_DONE)
print(f"Old {old_value}\tNew: {value}\tResult: {result}")
timestamp_ = timestamp
return result
# Subscribe and wait for update
status = SubscriptionStatus(self.scan_done, isReady, timeout=timeout, settle_time=0.5)
return status
status = DeviceStatus(self, settle_time=0.5)
status.set_finished()
return status

View File

@@ -0,0 +1,877 @@
+ 119
0
import time
import numpy as np
from bec_lib import bec_logger
from bec_server.scan_server.scans import AsyncFlyScanBase, ScanArgType, ScanBase
logger = bec_logger.logger
CMD_NONE = 0
CMD_RASTER_SCAN_SIMPLE = 1
CMD_MEASURE_STANDARD = 2
CMD_VERTICAL_LINE_SCAN = 3
CMD_SCREENING = 4
CMD_SUPER_FAST_OMEGA = 5
CMD_STILL_WEDGE = 6
CMD_STILLS = 7
CMD_REPEAT_SINGLE_OSCILLATION = 8
CMD_SINGLE_OSCILLATION = 9
CMD_OLD_FASHIONED = 10
CMD_RASTER_SCAN = 11
CMD_JET_ROTATION = 12
CMD_X_HELICAL = 13
CMD_X_RUNSEQ = 14
CMD_JUNGFRAU = 15
CMD_MSOX = 16
CMD_SLIT_SCAN = 17
CMD_RASTER_SCAN_STILL = 18
CMD_SCAN_SASTT = 19
CMD_SCAN_SASTT_V2 = 20
CMD_SCAN_SASTT_V3 = 21
class MxFlyscanBase(AsyncFlyScanBase):
""" Base class for MX flyscans
"""
scan_report_hint = "table"
arg_input = {}
arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None}
def __init__(self, *args, parameter: dict = None, **kwargs):
""" Just set num_pos=0 to avoid hanging
"""
self.num_pos = 0
super().__init__(parameter=parameter, **kwargs)
def stage(self):
""" ToDo: Sot sure if we should call super() here as it'd stage the whole beamline"""
return super().stage()
def scan_core(self):
""" The actual scan logic comes here.
"""
self.pointID = 0
# Kick off the run
yield from self.stubs.command("abr", "start_command.set", 1)
logger.info("Measurement launched on the ABR stage...")
# ToDo: Wait for scan to finish
def cleanup(self):
"""Set scan progress to 1 to finish the scan"""
self.num_pos = 1
return super().cleanup()
class MeasureStandardWedge(MxFlyscanBase):
""" Measure a standard wedge scan
"""
scan_name = "standard_wedge"
required_kwargs = ["start", "range"]
def __init__(self, *args, parameter: dict = None, **kwargs):
""" Standard measurement scan
Measure a standard wedge from `start` to `start` + `wedge` during
`etime` on the Omega axis.
The scan itself is executed by the scan service running on the Aerotech
controller. Ophyd just configures, launches it and waits for completion.
Example
-------
>>> scans.standard_wedge(start=42, range=10, scan_time=20)
Parameters
----------
start : (float, float)
Scan start position of the axis.
range : (float, float)
Scan range of the axis.
scan_time : (float)
Total scan time for the movement [s].
ready_rate : float, optional
No clue what is this... (default=500)
"""
super().__init__(parameter=parameter, **kwargs)
self.axis = ['omega']
self.scan_command = CMD_MEASURE_STANDARD
self.scan_start = self.caller_kwargs.get("start")
self.scan_range = self.caller_kwargs.get("range")
self.scan_scan_time = float(self.caller_kwargs.get("scan_time", 5))
self.scan_ready_rate = float(self.caller_kwargs.get("ready_rate", 500))
def pre_scan(self):
# ToDo: Move roughly to start position
# Set the global variables
yield from self.stubs.command("abr", "scan_command.set", self.scan_command)
yield from self.stubs.command("abr", "_var_1.set", self.scan_start)
yield from self.stubs.command("abr", "_var_2.set", self.scan_range)
yield from self.stubs.command("abr", "_var_3.set", self.scan_scan_time)
yield from self.stubs.command("abr", "_var_4.set", self.scan_ready_rate)
# Call super
yield from self.stubs.pre_scan()
class MeasureScanSlits(MxFlyscanBase):
""" Measure a slit scan
"""
scan_name = "slit_scan"
required_kwargs = ["start", "range"]
def __init__(self, *args, parameter: dict = None, **kwargs):
""" Slit scan
Measure a standard slit scan.
The scan itself is executed by the scan service running on the Aerotech
controller. Ophyd just configures, launches it and waits for completion.
Example
-------
>>> scans.slit_scan(start=42, range=10, scan_time=20)
Parameters
----------
delta_x : (float, float)
Scan range with slit in x.
delta_y : (float, float)
Scan range with slit in y.
velocity : (float)
Scan velocity [mm/s]].
"""
super().__init__(parameter=parameter, **kwargs)
self.axis = []
self.scan_command = CMD_SLIT_SCAN
self.scan_delta_x = self.caller_kwargs.get("delta_x")
self.scan_delta_y = self.caller_kwargs.get("delta_y")
self.scan_velocity = self.caller_kwargs.get("velocity")
def pre_scan(self):
# ToDo: Move roughly to start position
# Set the global variables
yield from self.stubs.command("abr", "scan_command.set", self.scan_command)
yield from self.stubs.command("abr", "_var_1.set", self.scan_delta_x)
yield from self.stubs.command("abr", "_var_2.set", self.scan_delta_y)
yield from self.stubs.command("abr", "_var_3.set", self.scan_velocity)
# Call super
yield from self.stubs.pre_scan()
class MeasureRasterSimple(MxFlyscanBase):
""" Measure a grid scan
"""
scan_name = "raster_simple"
required_kwargs = ["exp_time", "cell_width", "cell_height", "ncols", "nrows"]
def __init__(self, *args, parameter: dict = None, **kwargs):
""" Simnple raster scan
Measure a simplified raster scan, assuming the goniometer is currently
at the CENTER of TOP-LEFT CELL.
The scan itself is executed by the scan service running on the Aerotech
controller. Ophyd just configures, launches it and waits for completion.
Example
-------
>>> scans.raster_simple(exp_time=0.5, cell_width=0.2, cell_height=0.2, ncols=10, nrows=10)
Parameters
----------
exp_time : (float)
Exposure time per point [s].
cell_width : (float)
Scan step size [mm].
cell_height : (float)
Scan step size [mm].
ncols : (int)
Number of scan steps.
nrows : (int)
Number of scan steps.
"""
super().__init__(parameter=parameter, **kwargs)
self.axis = []
self.scan_command = CMD_RASTER_SCAN_SIMPLE
self.scan_exp_time = self.caller_kwargs.get("exp_time")
self.scan_stepsize_x = self.caller_kwargs.get("cell_width")
self.scan_stepsize_y = self.caller_kwargs.get("cell_height")
self.scan_stepnum_x = int(self.caller_kwargs.get("ncols"))
self.scan_stepnum_y = int(self.caller_kwargs.get("nrows"))
def pre_scan(self):
# ToDo: Move roughly to start position
# Set the global variables
yield from self.stubs.command("abr", "raster_scan_done.set", 0)
yield from self.stubs.command("abr", "scan_command.set", self.scan_command)
yield from self.stubs.command("abr", "_var_1.set", self.scan_exp_time)
yield from self.stubs.command("abr", "_var_2.set", self.scan_stepsize_x)
yield from self.stubs.command("abr", "_var_3.set", self.scan_stepsize_y)
yield from self.stubs.command("abr", "_var_4.set", self.scan_stepnum_x)
yield from self.stubs.command("abr", "_var_5.set", self.scan_stepnum_y)
yield from self.stubs.command("abr", "_var_6.set", 0)
yield from self.stubs.command("abr", "_var_7.set", 0)
yield from self.stubs.command("abr", "_var_8.set", 0)
# ToDo: This line was left out
#self.raster._raster_num_rows = nrows
# Call super
yield from self.stubs.pre_scan()
class MeasureSASTT(MxFlyscanBase):
""" Measure a small angle scanning tensor tomography scan
"""
scan_name = "measure_sastt"
required_kwargs = ["exp_time", "cell_width", "cell_height", "ncols", "nrows"]
def __init__(self, *args, parameter: dict = None, **kwargs):
""" Standard measurement scan
Measure a simplified grid scan, assuming the goniometer is currently
at the CENTER of the required grid.
The scan itself is executed by the scan service running on the Aerotech
controller. Ophyd just configures, launches it and waits for completion.
Example
-------
>>> scans.measure_sastt(exp_time=1.0, cell_width=0.01, cell_height=0.01, ncols=100, nrows=100)
Parameters
----------
exp_time : (float)
Exposure time per point [s].
cell_width : (float)
Scan step size [mm].
cell_height : (float)
Scan step size [mm].
ncols : (int)
Number of scan steps.
nrows : (int)
Number of scan steps.
odd_tweak : (float)
Distance to be added before the open shutter command [mm].
Used only in scan version=3. Should be small (default: 0)!
even_tweak : (float)
Distance to be added before the open shutter command [mm].
Used only in scan version=3. Should be small (default: 0)!
version : (int)
Scanning mode for tensor tomo.
1 = original snake scan, single PSO window
2 = scan always from LEFT---RIGHT
3 = snake scan alternating PSO window for even/odd rows
"""
super().__init__(parameter=parameter, **kwargs)
self.axis = []
self.scan_version = int(self.caller_kwargs.get("version", 1))
if self.scan_version==1:
self.scan_command = CMD_SCAN_SASTT
if self.scan_version==2:
self.scan_command = CMD_SCAN_SASTT_V2
if self.scan_version==3:
self.scan_command = CMD_SCAN_SASTT_V3
else:
raise RuntimeError(f"Non existing SAS-TT scan mode: {self.scan_version}")
self.scan_exp_time = self.caller_kwargs.get("exp_time")
self.scan_stepsize_x = self.caller_kwargs.get("cell_width")
self.scan_stepsize_y = self.caller_kwargs.get("cell_height")
self.scan_stepnum_x = int(self.caller_kwargs.get("ncols"))
self.scan_stepnum_y = int(self.caller_kwargs.get("nrows"))
self.scan_even_tweak = self.caller_kwargs.get("even_tweak", 0)
self.scan_odd_tweak = self.caller_kwargs.get("off_tweak", 0)
def pre_scan(self):
# ToDo: Move roughly to start position
# Set the global variables
yield from self.stubs.command("abr", "raster_scan_done.set", 0)
yield from self.stubs.command("abr", "scan_command.set", self.scan_command)
yield from self.stubs.command("abr", "_var_1.set", self.scan_exp_time)
yield from self.stubs.command("abr", "_var_2.set", self.scan_stepsize_x)
yield from self.stubs.command("abr", "_var_3.set", self.scan_stepsize_y)
yield from self.stubs.command("abr", "_var_4.set", self.scan_stepnum_x)
yield from self.stubs.command("abr", "_var_5.set", self.scan_stepnum_y)
yield from self.stubs.command("abr", "_var_6.set", self.scan_even_tweak)
yield from self.stubs.command("abr", "_var_7.set", self.scan_odd_tweak)
yield from self.stubs.command("abr", "_var_8.set", 0)
# ToDo: This line was left out
# self.raster._raster_num_rows = nrows
# Call super
yield from self.stubs.pre_scan()
class MeasureVerticalLine(MxFlyscanBase):
""" Measure a vertical line using the GMY motor
Simple fly scan that performs a single scan vertically.
The line is nummCells * cellHeight long and the exposureTime is per cell.
The scan itself is executed by the scan service running on the Aerotech
controller. Ophyd just configures, launches it and waits for completion.
Example
-------
>>> scans.measure_vertical_line(start=42, range=10, scan_time=20)
Parameters
----------
cell_height : (float)
Step size [mm].
nrows : (int)
Scan range of the axis.
exp_time : (float)
Exposure time per cell [s]
"""
scan_name = "measure_vertical_line"
required_kwargs = ["exp_time", "cell_height", "nrows"]
def __init__(self, *args, parameter: dict = None, **kwargs):
super().__init__(parameter=parameter, **kwargs)
self.axis = ['gmy']
self.scan_command = CMD_VERTICAL_LINE_SCAN
self.scan_exp_time = self.caller_kwargs.get("exp_time")
self.scan_stepsize_y = self.caller_kwargs.get("cell_height")
self.scan_stepnum_y = int(self.caller_kwargs.get("nrows"))
def pre_scan(self):
# ToDo: Move roughly to start position
# Set the global variables
yield from self.stubs.command("abr", "scan_command.set", self.scan_command)
yield from self.stubs.command("abr", "_var_1.set", self.scan_stepsize_y)
yield from self.stubs.command("abr", "_var_2.set", self.scan_stepnum_y)
yield from self.stubs.command("abr", "_var_3.set", self.scan_exp_time)
# Call super
yield from self.stubs.pre_scan()
class MeasureScreening(MxFlyscanBase):
""" Measure a sample screening program
The scan itself is executed by the scan service running on the Aerotech
controller. Ophyd just configures, launches it and waits for completion.
Example
-------
>>> scans.measure_screening(start=42, range=10, scan_time=20, degrees=180, frames=1800)
Parameters
----------
start : (float)
Scan start position of the axis.
range : (float)
Scan range of the axis.
scan_time : (float)
Total scan time for the movement [s].
degrees : (???)
frames : (???)
delta : (???)
(default: 0.5).
"""
scan_name = "measure_screening"
required_kwargs = ["start", "range", "scan_time", "degrees", "frames"]
def __init__(self, *args, parameter: dict = None, **kwargs):
super().__init__(parameter=parameter, **kwargs)
self.axis = ['gmy']
self.scan_command = CMD_SCREENING
self.scan_start = self.caller_kwargs.get("start")
self.scan_range = self.caller_kwargs.get("range")
self.scan_scan_time = self.caller_kwargs.get("scan_time")
self.scan_degrees = self.caller_kwargs.get("degrees")
self.scan_frames = self.caller_kwargs.get("frames")
self.scan_delta = self.caller_kwargs.get("delta", 0.5)
def pre_scan(self):
# ToDo: Move roughly to start position
# Set the global variables
yield from self.stubs.command("abr", "scan_command.set", self.scan_command)
yield from self.stubs.command("abr", "_var_1.set", self.scan_start)
yield from self.stubs.command("abr", "_var_2.set", self.scan_range)
yield from self.stubs.command("abr", "_var_3.set", self.scan_scan_time)
yield from self.stubs.command("abr", "_var_4.set", self.scan_degrees)
yield from self.stubs.command("abr", "_var_5.set", self.scan_frames)
yield from self.stubs.command("abr", "_var_6.set", self.scan_delta)
# Call super
yield from self.stubs.pre_scan()
class MeasureFastOmega(MxFlyscanBase):
""" Measure a fast omega scan
MEasures a fast rotational scan with the rotation stage (omega).
The scan itself is executed by the scan service running on the Aerotech
controller. Ophyd just configures, launches it and waits for completion.
Example
-------
>>> scans.measure_fast_omega(start=42, range=180, scan_time=20)
Parameters
----------
start : (float)
Scan start position of the axis.
range : (float)
Scan range of the axis.
scan_time : (float)
Total scan time for the movement [s].
rate : (???)
(default: 200).
"""
scan_name = "measure_fast_omega"
required_kwargs = ["start", "range", "exp_time"]
def __init__(self, *args, parameter: dict = None, **kwargs):
super().__init__(parameter=parameter, **kwargs)
self.axis = ['omega']
self.scan_command = CMD_SUPER_FAST_OMEGA
self.scan_start = self.caller_kwargs.get("start")
self.scan_range = self.caller_kwargs.get("range")
self.scan_scan_time = self.caller_kwargs.get("scan_time")
self.scan_rate = self.caller_kwargs.get("rate", 200)
def pre_scan(self):
# ToDo: Move roughly to start position
# Set the global variables
yield from self.stubs.command("abr", "scan_command.set", self.scan_command)
yield from self.stubs.command("abr", "_var_1.set", self.scan_start)
yield from self.stubs.command("abr", "_var_2.set", self.scan_range)
yield from self.stubs.command("abr", "_var_3.set", self.scan_scan_time)
yield from self.stubs.command("abr", "_var_4.set", self.scan_rate)
# Call super
yield from self.stubs.pre_scan()
class MeasureStillWedge(MxFlyscanBase):
""" Measure a still wedge scan
The scan itself is executed by the scan service running on the Aerotech
controller. Ophyd just configures, launches it and waits for completion.
Example
-------
>>> scans.measure_still_wedge(start=42, range=90, scan_time=20, oscrange=15)
Parameters
----------
start : (float)
Scan start position of the axis.
range : (float)
Scan range of the axis.
scan_time : (float)
Total scan time for the movement [s].
oscrange : (float)
sleep_after_shutter_close : (float)
[s] (default: 0.01).
"""
scan_name = "measure_still_wedge"
required_kwargs = ["start", "range", "exp_time", "oscrange"]
def __init__(self, *args, parameter: dict = None, **kwargs):
super().__init__(parameter=parameter, **kwargs)
self.axis = ['omega']
self.scan_command = CMD_STILL_WEDGE
self.scan_start = self.caller_kwargs.get("start")
self.scan_range = self.caller_kwargs.get("range")
self.scan_scan_time = self.caller_kwargs.get("scan_time")
self.scan_oscrange = self.caller_kwargs.get("oscrange")
self.scan_shutter_sleep = self.caller_kwargs.get("sleep_after_shutter_close", 0.01)
def pre_scan(self):
# ToDo: Move roughly to start position
# Set the global variables
yield from self.stubs.command("abr", "scan_command.set", self.scan_command)
yield from self.stubs.command("abr", "_var_1.set", self.scan_start)
yield from self.stubs.command("abr", "_var_2.set", self.scan_range)
yield from self.stubs.command("abr", "_var_3.set", self.scan_scan_time)
yield from self.stubs.command("abr", "_var_4.set", self.scan_oscrange)
yield from self.stubs.command("abr", "_var_5.set", self.scan_shutter_sleep)
# Call super
yield from self.stubs.pre_scan()
class MeasureStills(MxFlyscanBase):
""" Measure a still image sequence
Measures a series of still images without any motor movement.
IMPORTANT: use idle_time=0.0 to prevent shutter open/close
The scan itself is executed by the scan service running on the Aerotech
controller. Ophyd just configures, launches it and waits for completion.
Example
-------
>>> scans.measure_stils(nframes=100, exp_time=0.1, idle_time=30)
Parameters
----------
nframes : (int)
Total number of frames to be recorded.
exposure_time : (float)
Exposure time of each frame [s].
idle_time : (float)
Sleep time between the frames [s].
"""
scan_name = "measure_stils"
required_kwargs = ["exp_time", "nframes"]
def __init__(self, *args, parameter: dict = None, **kwargs):
super().__init__(parameter=parameter, **kwargs)
self.axis = []
self.scan_command = CMD_STILLS
self.scan_exp_time = self.caller_kwargs.get("exp_time")
self.scan_num_frames = int(self.caller_kwargs.get("nframes"))
self.scan_idle_time = self.caller_kwargs.get("idle_time")
def pre_scan(self):
# ToDo: Move roughly to start position
# Set the global variables
yield from self.stubs.command("abr", "scan_command.set", self.scan_command)
yield from self.stubs.command("abr", "_var_1.set", self.scan_num_frames)
yield from self.stubs.command("abr", "_var_2.set", self.scan_exp_time)
yield from self.stubs.command("abr", "_var_3.set", self.scan_idle_time)
# Call super
yield from self.stubs.pre_scan()
class MeasureSingleOscillation(MxFlyscanBase):
""" Measure a single oscillation
The scan itself is executed by the scan service running on the Aerotech
controller. Ophyd just configures, launches it and waits for completion.
Example
-------
>>> scans.measure_single_osc(start=42, range=180, scan_time=20)
Parameters
----------
start : (float)
Scan start position of the axis.
range : (float)
Oscillation range of the axis.
scan_time : (float)
Total scan time for the movement [s].
delta : (???)
(default: 0).
settle : (???)
(default: 0.5).
"""
scan_name = "measure_single_osc"
required_kwargs = ["start", "range", "scan_time"]
def __init__(self, *args, parameter: dict = None, **kwargs):
super().__init__(parameter=parameter, **kwargs)
self.axis = ['omega']
self.scan_command = CMD_SINGLE_OSCILLATION
self.scan_start = self.caller_kwargs.get("start")
self.scan_range = self.caller_kwargs.get("range")
self.scan_scan_time = self.caller_kwargs.get("scan_time")
self.scan_delta = self.caller_kwargs.get("delta", 0)
self.scan_settle = self.caller_kwargs.get("settle", 0.5)
def pre_scan(self):
# ToDo: Move roughly to start position
# Set the global variables
yield from self.stubs.command("abr", "scan_command.set", self.scan_command)
yield from self.stubs.command("abr", "_var_1.set", self.scan_start)
yield from self.stubs.command("abr", "_var_2.set", self.scan_range)
yield from self.stubs.command("abr", "_var_3.set", self.scan_scan_time)
yield from self.stubs.command("abr", "_var_4.set", self.scan_delta)
yield from self.stubs.command("abr", "_var_5.set", self.scan_settle)
# Call super
yield from self.stubs.pre_scan()
class MeasureRepeatedOscillation(MxFlyscanBase):
""" Measure oscillation repeatedly
The scan itself is executed by the scan service running on the Aerotech
controller. Ophyd just configures, launches it and waits for completion.
Example
-------
>>> scans.measure_multi_osc(start=42, range=180, scan_time=20)
Parameters
----------
nframes : (int)
???
scan_time : (float)
Total scan time for the movement [s].
range : (float)
Oscillation range of the axis.
settle : (???)
(default: 0).
delta : (???)
(default: 0.5).
"""
scan_name = "measure_multi_osc"
required_kwargs = ["start", "range", "scan_time"]
def __init__(self, *args, parameter: dict = None, **kwargs):
super().__init__(parameter=parameter, **kwargs)
self.axis = ['omega']
self.scan_command = CMD_REPEAT_SINGLE_OSCILLATION
self.scan_num_frames = self.caller_kwargs.get("nframes")
self.scan_range = self.caller_kwargs.get("range")
self.scan_scan_time = self.caller_kwargs.get("scan_time")
self.scan_sleep = self.caller_kwargs.get("sleep", 0)
self.scan_delta = self.caller_kwargs.get("delta", 0.5)
def pre_scan(self):
# ToDo: Move roughly to start position
# Configure the global variables
d = {"scan_command" : self.scan_command,
"var_1" : self.scan_num_frames,
"var_2" : self.scan_scan_time,
"var_3" : self.scan_range,
"var_4" : self.scan_sleep,
"var_5" : self.scan_delta,
}
yield from self.stubs.send_rpc_and_wait("abr", "configure", d)
# Call super
yield from self.stubs.pre_scan()
class MeasureMSOX(MxFlyscanBase):
""" Standard MSOX scan
The scan itself is executed by the scan service running on the Aerotech
controller. Ophyd just configures, launches it and waits for completion.
Example
-------
>>> scans.measure_msox(start=42, range=10, exp_time=0.1)
Parameters
----------
start : (float, float)
Scan start position of the axis.
range : (float, float)
Scan range of the axis.
exp_time : (float)
Exposure time for each point [s].
num_datasets : int
???
rest_time : float
???
"""
scan_name = "measure_msox"
required_kwargs = ["start", "range", "exp_time"]
def __init__(self, *args, parameter: dict = None, **kwargs):
super().__init__(parameter=parameter, **kwargs)
self.axis = ['omega']
self.scan_command = CMD_MSOX
self.scan_start = self.caller_kwargs.get("start")
self.scan_range = self.caller_kwargs.get("range")
self.scan_exp_time = self.caller_kwargs.get("exp_time")
self.scan_num_datasets = int(self.caller_kwargs.get("num_datasets", 1))
self.scan_rest_time = self.caller_kwargs.get("rest_time", 0)
def pre_scan(self):
# ToDo: Move roughly to start position
# Reset the scan engine
yield from self.stubs.send_rpc_and_wait("abr", "reset")
# Configure the global variables
d = {"scan_command" : self.scan_command,
"var_1" : self.scan_start,
"var_2" : self.scan_range,
"var_3" : self.scan_exp_time,
"var_4" : self.scan_num_datasets,
"var_5" : self.scan_rest_time,
}
yield from self.stubs.send_rpc_and_wait("abr", "configure", d)
# Call super
yield from self.stubs.pre_scan()
class MeasureGrid(MxFlyscanBase):
""" General grid scan
Note: This was probably never used in its current form, because the
code had an undefined variable 'self.position'
The scan itself is executed by the scan service running on the Aerotech
controller. Ophyd just configures, launches it and waits for completion.
Example
-------
>>> scans.measure_msox(start=42, range=10, exp_time=0.1)
Parameters
----------
positions : 2D-array
Scan position of each axis, i.e. (N, 3) shaped array.
ncols : int
Nmber of columns.
angle:
???
exp_time : (float)
Exposure time for each point [s].
"""
scan_name = "measure_raster"
required_kwargs = ["positions", "ncols", "angle", "exp_time"]
def __init__(self, *args, parameter: dict = None, **kwargs):
super().__init__(parameter=parameter, **kwargs)
self.axis = ['omega']
self.scan_command = CMD_RASTER_SCAN
self.scan_positions = self.caller_kwargs.get("positions")
self.scan_stepnum_x = int(self.caller_kwargs.get("ncols"))
self.scan_stepnum_y = self.scan_positions / self.scan_stepnum_x
self.scan_angle = self.caller_kwargs.get("angle")
self.scan_exp_time = self.caller_kwargs.get("exp_time")
if len(self.scan_positions)==1:
raise RuntimeWarning("Raster scan with one cell makes no sense")
# Good practice: new members only in __init__
self.scan_ystep = 0.010
self.scan_zstep = 0.010
self.scan_gmy_step = 0.010
def prepare_positions(self):
# Calculate step sizes
pos_start = self.scan_positions[0] # first cell on first row
pos_col_2nd = self.scan_positions[1] # second cell on first row
pos_col_end = self.scan_positions[self.scan_stepnum_x-1] # last cell on first row
self.scan_xstep = pos_col_2nd[0] - pos_start[0]
half_cell = abs(self.scan_xstep) / 2.0
self.scan_start_x = pos_start[0] - half_cell
self.scan_end_x = pos_col_end[0] + half_cell
if self.scan_stepnum_y > 1:
pos_row_2nd = self.scan_positions[self.scan_stepnum_x] # first cell on second row
self.scan_ystep = pos_row_2nd[1] - pos_start[1]
self.scan_zstep = pos_row_2nd[2] - pos_start[2]
self.scan_gmy_step = np.linalg.norm([self.scan_ystep, self.scan_zstep])
# ToDo : This was left out
# self._raster_starting_x = start_x
# self._raster_starting_y = y1
# self._raster_starting_z = z1
# self._raster_step_y = ystep
# self._raster_step_z = zstep
# self._raster_current_row = 0
# self._raster_num_rows = rows
def pre_scan(self):
# ToDo: Move roughly to start position
# Reset the scan engine
yield from self.stubs.send_rpc_and_wait("abr", "reset")
# Configure the global variables
d = {"scan_command" : self.scan_command,
"var_1" : self.omega.position,
"var_2" : self.scan_start_x,
"var_3" : self.scan_end_x,
"var_4" : self.scan_stepnum_x,
"var_5" : self.scan_stepnum_y,
"var_6" : self.scan_angle,
"var_7" : self.scan_exp_time,
"var_8" : HALF_PERIOD,
"var_9" : self._gmx_offset.get,
"var_10" : self.scan_gmy_step,
}
yield from self.stubs.send_rpc_and_wait("abr", "configure", d)
# Call super
yield from self.stubs.pre_scan()