First Aerotech scan works

This commit is contained in:
Unknown MX Person
2024-09-18 12:49:33 +02:00
parent 3bf21ff647
commit 23aadabfd1
5 changed files with 4470 additions and 359 deletions

2107
pxiii_bec/config_saved.yaml Normal file

File diff suppressed because it is too large Load Diff

2094
pxiii_bec/config_saved.yaml~ Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -220,25 +220,25 @@ class AerotechAbrStage(Device):
# Set the corresponding global variables
if "scan_command" in d:
self.scan_command.set(d['scan_command']).wait()
if "var_1" in d:
if "var_1" in d and d['var_1'] is not None:
self._var_1.set(d['var_1']).wait()
if "var_2" in d:
if "var_2" in d and d['var_2'] is not None:
self._var_2.set(d['var_2']).wait()
if "var_3" in d:
if "var_3" in d and d['var_3'] is not None:
self._var_3.set(d['var_3']).wait()
if "var_4" in d:
if "var_4" in d and d['var_4'] is not None:
self._var_4.set(d['var_4']).wait()
if "var_5" in d:
if "var_5" in d and d['var_5'] is not None:
self._var_5.set(d['var_5']).wait()
if "var_6" in d:
if "var_6" in d and d['var_6'] is not None:
self._var_6.set(d['var_6']).wait()
if "var_7" in d:
if "var_7" in d and d['var_7'] is not None:
self._var_7.set(d['var_7']).wait()
if "var_8" in d:
if "var_8" in d and d['var_8'] is not None:
self._var_8.set(d['var_8']).wait()
if "var_9" in d:
if "var_9" in d and d['var_9'] is not None:
self._var_9.set(d['var_9']).wait()
if "var_10" in d:
if "var_10" in d and d['var_10'] is not None:
self._var_10.set(d['var_10']).wait()
new = self.read_configuration()

View File

@@ -1 +1 @@
from .mx_measurements import MeasureStandardWedge
from .mx_measurements import MeasureFastOmega

View File

@@ -41,21 +41,66 @@ class AbrCmd:
SCAN_SASTT_V3 = 21
class MxFlyscanBase(AsyncFlyScanBase):
class AerotechFlyscanBase(AsyncFlyScanBase):
""" Base class for MX flyscans
I'd be quite easy to improve standardization.
Low-level base class for standard scans at the PX beamlines at SLS. Theese scans use the
A3200 rotation stage and the actual experiment is performed using an AeroBasic script
controlled via global variables.
"""
scan_report_hint = "table"
arg_input = {}
arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None}
# Aerotech stage config
abr_command = None
abr_globals = {}
abr_reset = False
abr_raster_reset =False
abr_complete = False
abr_timeout = None
def __init__(self, *args, parameter: dict = None, **kwargs):
""" Just set num_pos=0 to avoid hanging
""" Just set num_pos=0 to avoid hanging and override defaults if explicitly set from
parameters.
"""
self.num_pos = 0
super().__init__(parameter=parameter, **kwargs)
# Override if explicitly passed as parameter
if "abr_command" in self.caller_kwargs:
self.abr_command = self.caller_kwargs.get("abr_command")
if "abr_globals" in self.caller_kwargs:
self.abr_globals = self.caller_kwargs.get("abr_globals")
if "abr_reset" in self.caller_kwargs:
self.abr_reset = self.caller_kwargs.get("abr_reset")
if "abr_raster_reset" in self.caller_kwargs:
self.abr_raster_reset = self.caller_kwargs.get("abr_raster_reset")
if "abr_complete" in self.caller_kwargs:
self.abr_complete = self.caller_kwargs.get("abr_complete")
if "abr_timeout" in self.caller_kwargs:
self.abr_timeout = self.caller_kwargs.get("abr_timeout")
def pre_scan(self):
# ToDo: Move roughly to start position
# Reset the scan engine
if self.abr_reset:
yield from self.stubs.send_rpc_and_wait("abr", "reset")
if self.abr_raster_reset:
yield from self.stubs.command("abr", "raster_scan_done.set", 0)
# Configure the global variables
d = {"scan_command" : self.abr_command}
d.update(self.abr_globals)
logger.info(d)
# Configure
yield from self.stubs.send_rpc_and_wait("abr", "configure", d)
# Call super
yield from super().pre_scan()
def stage(self):
""" ToDo: Sot sure if we should call super() here as it'd stage the whole beamline"""
return super().stage()
@@ -65,10 +110,13 @@ class MxFlyscanBase(AsyncFlyScanBase):
"""
self.pointID = 0
# Kick off the run
yield from self.stubs.command("abr", "start_command.set", 1)
yield from self.stubs.send_rpc_and_wait("abr", "start_command.set", 1)
logger.info("Measurement launched on the ABR stage...")
# ToDo: Wait for scan to finish
# Wait for grid scanner to finish
if self.abr_complete and self.abr_timeout is not None:
st = yield from self.stubs.send_rpc_and_wait("abr", "complete", self.abr_timeout)
st.wait()
def cleanup(self):
"""Set scan progress to 1 to finish the scan"""
@@ -77,7 +125,7 @@ class MxFlyscanBase(AsyncFlyScanBase):
class MeasureStandardWedge(MxFlyscanBase):
class MeasureStandardWedge(AerotechFlyscanBase):
""" Standard wedge scan
Measure a standard continous line scan from `start` to `start` + `range`
@@ -105,33 +153,25 @@ class MeasureStandardWedge(MxFlyscanBase):
required_kwargs = ["start", "range", "scan_time"]
def __init__(self, *args, parameter: dict = None, **kwargs):
super().__init__(parameter=parameter, **kwargs)
self.axis = ['omega']
self.scan_command = AbrCmd.MEASURE_STANDARD
self.scan_start = self.caller_kwargs.get("start")
self.scan_range = self.caller_kwargs.get("range")
self.scan_scan_time = float(self.caller_kwargs.get("scan_time"))
self.scan_ready_rate = float(self.caller_kwargs.get("ready_rate", 500))
def pre_scan(self):
# ToDo: Move roughly to start position
# Configure the global variables
d = {"scan_command" : self.scan_command,
# Set parameters from meaningful inputs
self.scan_start = parameter['kwargs'].get("start")
self.scan_range = parameter['kwargs'].get("range")
self.scan_scan_time = parameter['kwargs'].get("scan_time")
self.scan_ready_rate = parameter['kwargs'].get("ready_rate", 500)
self.abr_command = AbrCmd.MEASURE_STANDARD
self.abr_globals = {
"var_1" : self.scan_start,
"var_2" : self.scan_range,
"var_3" : self.scan_scan_time,
"var_4" : self.scan_ready_rate,
}
yield from self.stubs.send_rpc_and_wait("abr", "configure", d)
# Call super
yield from super().pre_scan()
# Call base class
super().__init__(parameter=parameter, **kwargs)
class MeasureScanSlits(MxFlyscanBase):
class MeasureScanSlits(AerotechFlyscanBase):
""" Slit scan
Measure a standard slit scan.
@@ -156,31 +196,23 @@ class MeasureScanSlits(MxFlyscanBase):
required_kwargs = ["delta_x", "delta_y", "velocity"]
def __init__(self, *args, parameter: dict = None, **kwargs):
super().__init__(parameter=parameter, **kwargs)
self.axis = []
self.scan_command = AbrCmd.SLIT_SCAN
self.scan_delta_x = self.caller_kwargs.get("delta_x")
self.scan_delta_y = self.caller_kwargs.get("delta_y")
self.scan_velocity = self.caller_kwargs.get("velocity")
def pre_scan(self):
# ToDo: Move roughly to start position
# Configure the global variables
d = {"scan_command" : self.scan_command,
# Set parameters from meaningful inputs
self.abr_command = AbrCmd.SLIT_SCAN
self.scan_delta_x = parameter['kwargs'].get("delta_x")
self.scan_delta_y = parameter['kwargs'].get("delta_y")
self.scan_velocity = parameter['kwargs'].get("velocity")
self.abr_globals = {
"var_1" : self.scan_delta_x,
"var_2" : self.scan_delta_y,
"var_3" : self.scan_velocity,
}
yield from self.stubs.send_rpc_and_wait("abr", "configure", d)
# Call super
yield from super().pre_scan()
# Call base class
super().__init__(parameter=parameter, **kwargs)
class MeasureRasterSimple(MxFlyscanBase):
class MeasureRasterSimple(AerotechFlyscanBase):
""" Simple raster scan
Measure a simplified raster scan, assuming the goniometer is currently
@@ -210,23 +242,16 @@ class MeasureRasterSimple(MxFlyscanBase):
required_kwargs = ["exp_time", "cell_width", "cell_height", "ncols", "nrows"]
def __init__(self, *args, parameter: dict = None, **kwargs):
super().__init__(parameter=parameter, **kwargs)
self.axis = []
self.scan_command = AbrCmd.RASTER_SCAN_SIMPLE
self.scan_exp_time = self.caller_kwargs.get("exp_time")
self.scan_stepsize_x = self.caller_kwargs.get("cell_width")
self.scan_stepsize_y = self.caller_kwargs.get("cell_height")
self.scan_stepnum_x = int(self.caller_kwargs.get("ncols"))
self.scan_stepnum_y = int(self.caller_kwargs.get("nrows"))
def pre_scan(self):
# ToDo: Move roughly to start position
# Reset done flag
yield from self.stubs.command("abr", "raster_scan_done.set", 0)
# Configure the global variables
d = {"scan_command" : self.scan_command,
# Set parameters from meaningful inputs
self.scan_exp_time = parameter['kwargs'].get("exp_time")
self.scan_stepsize_x = parameter['kwargs'].get("cell_width")
self.scan_stepsize_y = parameter['kwargs'].get("cell_height")
self.scan_stepnum_x = parameter['kwargs'].get("ncols")
self.scan_stepnum_y = parameter['kwargs'].get("nrows")
self.abr_command = AbrCmd.RASTER_SCAN_SIMPLE
self.abr_raster_reset = True
self.abr_globals = {
"var_1" : self.scan_exp_time,
"var_2" : self.scan_stepsize_x,
"var_3" : self.scan_stepsize_y,
@@ -236,17 +261,12 @@ class MeasureRasterSimple(MxFlyscanBase):
"var_7" : 0,
"var_8" : 0,
}
yield from self.stubs.send_rpc_and_wait("abr", "configure", d)
# ToDo: This line was left out
#self.raster._raster_num_rows = nrows
# Call super
yield from super().pre_scan()
# Call base class
super().__init__(parameter=parameter, **kwargs)
class MeasureSASTT(MxFlyscanBase):
class MeasureSASTT(AerotechFlyscanBase):
""" Small angle scanning tensor tomography scan
Measure a Small Angle Scanning Tensor Tomography scan on the specified
@@ -288,54 +308,41 @@ class MeasureSASTT(MxFlyscanBase):
required_kwargs = ["exp_time", "cell_width", "cell_height", "ncols", "nrows"]
def __init__(self, *args, parameter: dict = None, **kwargs):
super().__init__(parameter=parameter, **kwargs)
self.axis = []
self.scan_version = int(self.caller_kwargs.get("version", 1))
# Set parameters from meaningful inputs
self.scan_version = int(parameter['kwargs'].get("version", 1))
if self.scan_version==1:
self.scan_command = AbrCmd.SCAN_SASTT
self.abr_command = AbrCmd.SCAN_SASTT
if self.scan_version==2:
self.scan_command = AbrCmd.SCAN_SASTT_V2
self.abr_command = AbrCmd.SCAN_SASTT_V2
if self.scan_version==3:
self.scan_command = AbrCmd.SCAN_SASTT_V3
self.abr_command = AbrCmd.SCAN_SASTT_V3
else:
raise RuntimeError(f"Non existing SAS-TT scan mode: {self.scan_version}")
self.scan_exp_time = self.caller_kwargs.get("exp_time")
self.scan_stepsize_x = self.caller_kwargs.get("cell_width")
self.scan_stepsize_y = self.caller_kwargs.get("cell_height")
self.scan_stepnum_x = int(self.caller_kwargs.get("ncols"))
self.scan_stepnum_y = int(self.caller_kwargs.get("nrows"))
self.scan_even_tweak = self.caller_kwargs.get("even_tweak", 0)
self.scan_odd_tweak = self.caller_kwargs.get("off_tweak", 0)
def pre_scan(self):
# ToDo: Move roughly to start position
# Reset done flag
yield from self.stubs.command("abr", "raster_scan_done.set", 0)
# Configure the global variables
d = {"scan_command" : self.scan_command,
"var_1" : self.scan_exp_time,
"var_2" : self.scan_stepsize_x,
"var_3" : self.scan_stepsize_y,
"var_4" : self.scan_stepnum_x,
"var_5" : self.scan_stepnum_y,
"var_6" : self.scan_even_tweak,
"var_7" : self.scan_odd_tweak,
"var_8" : 0,
}
yield from self.stubs.send_rpc_and_wait("abr", "configure", d)
# ToDo: This line was left out
# self.raster._raster_num_rows = nrows
# Call super
yield from self.stubs.pre_scan()
self.scan_exp_time = parameter['kwargs'].get("exp_time")
self.scan_stepsize_x = parameter['kwargs'].get("cell_width")
self.scan_stepsize_y = parameter['kwargs'].get("cell_height")
self.scan_stepnum_x = parameter['kwargs'].get("ncols")
self.scan_stepnum_y = parameter['kwargs'].get("nrows")
self.scan_even_tweak = parameter['kwargs'].get("even_tweak", 0)
self.scan_odd_tweak = parameter['kwargs'].get("odd_tweak", 0)
self.abr_globals = {
'var_1': self.scan_exp_time,
'var_2': self.scan_stepsize_x,
'var_3': self.scan_stepsize_y,
'var_4': self.scan_stepnum_x,
'var_5': self.scan_stepnum_y,
'var_6': self.scan_even_tweak,
'var_7': self.scan_odd_tweak,
'var_8': 0,
}
# Call base class
super().__init__(parameter=parameter, **kwargs)
class MeasureVerticalLine(MxFlyscanBase):
class MeasureVerticalLine(AerotechFlyscanBase):
""" Vertical line scan using the GMY motor
Simple fly scan that records a single vertical line.
@@ -361,31 +368,23 @@ class MeasureVerticalLine(MxFlyscanBase):
required_kwargs = ["cell_time", "cell_height", "cell_num"]
def __init__(self, *args, parameter: dict = None, **kwargs):
super().__init__(parameter=parameter, **kwargs)
self.axis = ['gmy']
self.scan_command = AbrCmd.VERTICAL_LINE_SCAN
self.scan_cell_time = self.caller_kwargs.get("cell_time")
self.scan_stepsize_y = self.caller_kwargs.get("cell_height")
self.scan_stepnum_y = int(self.caller_kwargs.get("cell_num"))
def pre_scan(self):
# ToDo: Move roughly to start position
# Configure the global variables
d = {"scan_command" : self.scan_command,
"var_1" : self.scan_stepsize_y,
"var_2" : self.scan_stepnum_y,
"var_3" : self.scan_cell_time,
}
yield from self.stubs.send_rpc_and_wait("abr", "configure", d)
# Call super
yield from super().pre_scan()
# Set parameters from meaningful inputs
self.scan_cell_time = parameter['kwargs'].get("cell_time")
self.scan_stepsize_y = parameter['kwargs'].get("cell_height")
self.scan_stepnum_y = parameter['kwargs'].get("cell_num")
self.abr_command = AbrCmd.VERTICAL_LINE_SCAN
self.abr_globals = {
'var_1': self.scan_cell_time,
'var_2': self.scan_stepsize_y,
'var_3': self.scan_stepnum_y,
}
# Call base class
super().__init__(parameter=parameter, **kwargs)
class MeasureScreening(MxFlyscanBase):
class MeasureScreening(AerotechFlyscanBase):
""" Sample screening scan
The scan itself is executed by the scan service running on the Aerotech
@@ -412,22 +411,16 @@ class MeasureScreening(MxFlyscanBase):
required_kwargs = ["start", "range", "scan_time", "degrees", "frames"]
def __init__(self, *args, parameter: dict = None, **kwargs):
super().__init__(parameter=parameter, **kwargs)
self.axis = ['gmy']
self.scan_command = AbrCmd.SCREENING
self.scan_start = self.caller_kwargs.get("start")
self.scan_range = self.caller_kwargs.get("range")
self.scan_scan_time = self.caller_kwargs.get("scan_time")
self.scan_degrees = self.caller_kwargs.get("degrees")
self.scan_frames = self.caller_kwargs.get("frames")
self.scan_delta = self.caller_kwargs.get("delta", 0.5)
def pre_scan(self):
# ToDo: Move roughly to start position
# Configure the global variables
d = {"scan_command" : self.scan_command,
# Set parameters from meaningful inputs
self.scan_start = parameter['kwargs'].get("start")
self.scan_range = parameter['kwargs'].get("range")
self.scan_scan_time = parameter['kwargs'].get("scan_time")
self.scan_degrees = parameter['kwargs'].get("degrees")
self.scan_frames = parameter['kwargs'].get("frames")
self.scan_delta = parameter['kwargs'].get("delta", 0.5)
self.abr_command = AbrCmd.SCREENING
self.abr_globals = {
"var_1" : self.scan_start,
"var_2" : self.scan_range,
"var_3" : self.scan_scan_time,
@@ -435,14 +428,12 @@ class MeasureScreening(MxFlyscanBase):
"var_5" : self.scan_frames,
"var_6" : self.scan_delta,
}
yield from self.stubs.send_rpc_and_wait("abr", "configure", d)
# Call super
yield from super().pre_scan()
# Call base class
super().__init__(parameter=parameter, **kwargs)
class MeasureFastOmega(MxFlyscanBase):
class MeasureFastOmega(AerotechFlyscanBase):
""" Fast omega scan
MEasures a fast rotational scan with the rotation stage (omega).
@@ -469,33 +460,26 @@ class MeasureFastOmega(MxFlyscanBase):
required_kwargs = ["start", "range", "scan_time"]
def __init__(self, *args, parameter: dict = None, **kwargs):
super().__init__(parameter=parameter, **kwargs)
self.axis = ['omega']
self.scan_command = AbrCmd.SUPER_FAST_OMEGA
self.scan_start = self.caller_kwargs.get("start")
self.scan_range = self.caller_kwargs.get("range")
self.scan_scan_time = self.caller_kwargs.get("scan_time")
self.scan_rate = self.caller_kwargs.get("rate", 200)
def pre_scan(self):
# ToDo: Move roughly to start position
# Configure the global variables
d = {"scan_command" : self.scan_command,
# Set parameters from meaningful inputs
logger.info(parameter)
self.scan_start = parameter['kwargs'].get("start")
self.scan_range = parameter['kwargs'].get("range")
self.scan_scan_time = parameter['kwargs'].get("scan_time")
self.scan_rate = parameter['kwargs'].get("rate", 200)
self.abr_command = AbrCmd.SUPER_FAST_OMEGA
self.abr_globals = {
"var_1" : self.scan_start,
"var_2" : self.scan_range,
"var_3" : self.scan_scan_time,
"var_4" : self.scan_rate,
}
yield from self.stubs.send_rpc_and_wait("abr", "configure", d)
# Call super
yield from self.stubs.pre_scan()
# Call base class
super().__init__(parameter=parameter, **kwargs)
class MeasureStillWedge(MxFlyscanBase):
class MeasureStillWedge(AerotechFlyscanBase):
""" Still wedge scan
The scan itself is executed by the scan service running on the Aerotech
@@ -521,35 +505,27 @@ class MeasureStillWedge(MxFlyscanBase):
required_kwargs = ["start", "range", "exp_time", "oscrange"]
def __init__(self, *args, parameter: dict = None, **kwargs):
super().__init__(parameter=parameter, **kwargs)
self.axis = ['omega']
self.scan_command = AbrCmd.STILL_WEDGE
self.scan_start = self.caller_kwargs.get("start")
self.scan_range = self.caller_kwargs.get("range")
self.scan_scan_time = self.caller_kwargs.get("scan_time")
self.scan_oscrange = self.caller_kwargs.get("oscrange")
self.scan_shutter_sleep = self.caller_kwargs.get("sleep_after_shutter_close", 0.01)
def pre_scan(self):
# ToDo: Move roughly to start position
# Configure the global variables
d = {"scan_command" : self.scan_command,
# Set parameters from meaningful inputs
self.scan_start = parameter['kwargs'].get("start")
self.scan_range = parameter['kwargs'].get("range")
self.scan_scan_time = parameter['kwargs'].get("scan_time")
self.scan_oscrange = parameter['kwargs'].get("oscrange")
self.scan_shutter_sleep = parameter['kwargs'].get("sleep_after_shutter_close", 0.01)
self.abr_command = AbrCmd.STILL_WEDGE
self.abr_globals = {
"var_1" : self.scan_start,
"var_2" : self.scan_range,
"var_3" : self.scan_scan_time,
"var_4" : self.scan_oscrange,
"var_5" : self.scan_shutter_sleep,
}
yield from self.stubs.send_rpc_and_wait("abr", "configure", d)
# Call super
yield from self.stubs.pre_scan()
# Call base class
super().__init__(parameter=parameter, **kwargs)
class MeasureStills(MxFlyscanBase):
class MeasureStills(AerotechFlyscanBase):
""" Still image sequence scan
Measures a series of still images without any motor movement.
@@ -575,31 +551,23 @@ class MeasureStills(MxFlyscanBase):
required_kwargs = ["exp_time", "nframes", "idle_time"]
def __init__(self, *args, parameter: dict = None, **kwargs):
super().__init__(parameter=parameter, **kwargs)
self.axis = []
self.scan_command = AbrCmd.STILLS
self.scan_exp_time = self.caller_kwargs.get("exp_time")
self.scan_num_frames = int(self.caller_kwargs.get("nframes"))
self.scan_idle_time = self.caller_kwargs.get("idle_time")
def pre_scan(self):
# ToDo: Move roughly to start position
# Configure the global variables
d = {"scan_command" : self.scan_command,
# Set parameters from meaningful inputs
self.scan_exp_time = parameter['kwargs'].get("exp_time")
self.scan_num_frames = parameter['kwargs'].get("nframes")
self.scan_idle_time = parameter['kwargs'].get("idle_time")
self.abr_command = AbrCmd.STILLS
self.abr_globals = {
"var_1" : self.scan_num_frames,
"var_2" : self.scan_exp_time,
"var_3" : self.scan_idle_time,
}
yield from self.stubs.send_rpc_and_wait("abr", "configure", d)
# Call super
yield from self.stubs.pre_scan()
# Call base class
super().__init__(parameter=parameter, **kwargs)
class MeasureSingleOscillation(MxFlyscanBase):
class MeasureSingleOscillation(AerotechFlyscanBase):
""" Single oscillation scan
The scan itself is executed by the scan service running on the Aerotech
@@ -626,35 +594,27 @@ class MeasureSingleOscillation(MxFlyscanBase):
required_kwargs = ["start", "range", "scan_time"]
def __init__(self, *args, parameter: dict = None, **kwargs):
super().__init__(parameter=parameter, **kwargs)
self.axis = ['omega']
self.scan_command = AbrCmd.SINGLE_OSCILLATION
self.scan_start = self.caller_kwargs.get("start")
self.scan_range = self.caller_kwargs.get("range")
self.scan_scan_time = self.caller_kwargs.get("scan_time")
self.scan_delta = self.caller_kwargs.get("delta", 0)
self.scan_settle = self.caller_kwargs.get("settle", 0.5)
def pre_scan(self):
# ToDo: Move roughly to start position
# Configure the global variables
d = {"scan_command" : self.scan_command,
# Set parameters from meaningful inputs
self.scan_start = parameter['kwargs'].get("start")
self.scan_range = parameter['kwargs'].get("range")
self.scan_scan_time = parameter['kwargs'].get("scan_time")
self.scan_delta = parameter['kwargs'].get("delta", 0)
self.scan_settle = parameter['kwargs'].get("settle", 0.5)
self.abr_command = AbrCmd.SINGLE_OSCILLATION
self.abr_globals = {
"var_1" : self.scan_start,
"var_2" : self.scan_range,
"var_3" : self.scan_scan_time,
"var_4" : self.scan_delta,
"var_5" : self.scan_settle,
}
yield from self.stubs.send_rpc_and_wait("abr", "configure", d)
# Call super
yield from self.stubs.pre_scan()
# Call base class
super().__init__(parameter=parameter, **kwargs)
class MeasureRepeatedOscillation(MxFlyscanBase):
class MeasureRepeatedOscillation(AerotechFlyscanBase):
""" Repeated oscillation scan
The scan itself is executed by the scan service running on the Aerotech
@@ -681,35 +641,27 @@ class MeasureRepeatedOscillation(MxFlyscanBase):
required_kwargs = ["nframes", "range", "scan_time"]
def __init__(self, *args, parameter: dict = None, **kwargs):
super().__init__(parameter=parameter, **kwargs)
self.axis = ['omega']
self.scan_command = AbrCmd.REPEAT_SINGLE_OSCILLATION
self.scan_num_frames = self.caller_kwargs.get("nframes")
self.scan_range = self.caller_kwargs.get("range")
self.scan_scan_time = self.caller_kwargs.get("scan_time")
self.scan_sleep = self.caller_kwargs.get("sleep", 0)
self.scan_delta = self.caller_kwargs.get("delta", 0.5)
def pre_scan(self):
# ToDo: Move roughly to start position
# Configure the global variables
d = {"scan_command" : self.scan_command,
# Set parameters from meaningful inputs
self.scan_num_frames = parameter['kwargs'].get("nframes")
self.scan_range = parameter['kwargs'].get("range")
self.scan_scan_time = parameter['kwargs'].get("scan_time")
self.scan_sleep = parameter['kwargs'].get("sleep", 0)
self.scan_delta = parameter['kwargs'].get("delta", 0.5)
self.abr_command = AbrCmd.REPEAT_SINGLE_OSCILLATION
self.abr_globals = {
"var_1" : self.scan_num_frames,
"var_2" : self.scan_scan_time,
"var_3" : self.scan_range,
"var_4" : self.scan_sleep,
"var_5" : self.scan_delta,
}
yield from self.stubs.send_rpc_and_wait("abr", "configure", d)
# Call super
yield from self.stubs.pre_scan()
# Call base class
super().__init__(parameter=parameter, **kwargs)
class MeasureMSOX(MxFlyscanBase):
class MeasureMSOX(AerotechFlyscanBase):
""" Standard MSOX scan
The scan itself is executed by the scan service running on the Aerotech
@@ -736,24 +688,16 @@ class MeasureMSOX(MxFlyscanBase):
required_kwargs = ["start", "range", "exp_time"]
def __init__(self, *args, parameter: dict = None, **kwargs):
super().__init__(parameter=parameter, **kwargs)
self.axis = ['omega']
self.scan_command = AbrCmd.MSOX
self.scan_start = self.caller_kwargs.get("start")
self.scan_range = self.caller_kwargs.get("range")
self.scan_exp_time = self.caller_kwargs.get("exp_time")
self.scan_num_datasets = int(self.caller_kwargs.get("num_datasets", 1))
self.scan_rest_time = self.caller_kwargs.get("rest_time", 0)
def pre_scan(self):
# ToDo: Move roughly to start position
# Reset the scan engine
yield from self.stubs.send_rpc_and_wait("abr", "reset")
# Configure the global variables
d = {"scan_command" : self.scan_command,
# Set parameters from meaningful inputs
self.scan_start = parameter['kwargs'].get("start")
self.scan_range = parameter['kwargs'].get("range")
self.scan_exp_time = parameter['kwargs'].get("exp_time")
self.scan_num_datasets = parameter['kwargs'].get("num_datasets", 1)
self.scan_rest_time = parameter['kwargs'].get("rest_time", 0)
self.abr_command = AbrCmd.MSOX
self.abr_reset = True
self.abr_globals = {
"var_1" : self.scan_start,
"var_2" : self.scan_range,
"var_3" : self.scan_exp_time,
@@ -765,14 +709,12 @@ class MeasureMSOX(MxFlyscanBase):
"var_9" : 0,
"var_10" : 0,
}
yield from self.stubs.send_rpc_and_wait("abr", "configure", d)
# Call super
yield from self.stubs.pre_scan()
# Call base class
super().__init__(parameter=parameter, **kwargs)
class MeasureGrid(MxFlyscanBase):
class MeasureGrid(AerotechFlyscanBase):
""" General grid scan
Note: This was probably never used in its current form, because the
@@ -799,27 +741,30 @@ class MeasureGrid(MxFlyscanBase):
scan_name = "measure_raster"
required_kwargs = ["positions", "ncols", "angle", "exp_time"]
def __init__(self, *args, parameter: dict = None, **kwargs):
super().__init__(parameter=parameter, **kwargs)
self.axis = ['omega']
# Default values
scan_start_x = None
scan_end_x = None
scan_xstep = 0.010
scan_ystep = 0.010
scan_zstep = 0.010
scan_gmy_step = 0.010
self.scan_command = AbrCmd.RASTER_SCAN
self.scan_positions = self.caller_kwargs.get("positions")
self.scan_stepnum_x = int(self.caller_kwargs.get("ncols"))
def __init__(self, *args, parameter: dict = None, **kwargs):
self.axis = ['omega']
# Set parameters from meaningful inputs
self.abr_command = AbrCmd.RASTER_SCAN
self.abr_reset = True
self.scan_positions = parameter['kwargs'].get("positions")
self.scan_stepnum_x = parameter['kwargs'].get("ncols")
self.scan_stepnum_y = self.scan_positions / self.scan_stepnum_x
self.scan_angle = self.caller_kwargs.get("angle")
self.scan_exp_time = self.caller_kwargs.get("exp_time")
self.scan_angle = parameter['kwargs'].get("angle")
self.scan_exp_time = parameter['kwargs'].get("exp_time")
if len(self.scan_positions)==1:
raise RuntimeWarning("Raster scan with one cell makes no sense")
# Good practice: new members only in __init__
self.scan_start_x = None
self.scan_end_x = None
self.scan_xstep = 0.010
self.scan_ystep = 0.010
self.scan_zstep = 0.010
self.scan_gmy_step = 0.010
# Call base class
super().__init__(parameter=parameter, **kwargs)
def prepare_positions(self):
# Calculate step sizes
@@ -851,14 +796,8 @@ class MeasureGrid(MxFlyscanBase):
# self._raster_current_row = 0
# self._raster_num_rows = rows
def pre_scan(self):
# ToDo: Move roughly to start position
# Reset the scan engine
yield from self.stubs.send_rpc_and_wait("abr", "reset")
# Configure the global variables
d = {"scan_command" : self.scan_command,
self.abr_globals = {
"var_1" : self.scan_omega_position,
"var_2" : self.scan_start_x,
"var_3" : self.scan_end_x,
@@ -870,16 +809,10 @@ class MeasureGrid(MxFlyscanBase):
"var_9" : self.scan_gmx_offset,
"var_10" : self.scan_gmy_step,
}
yield from self.stubs.send_rpc_and_wait("abr", "configure", d)
# Call super
yield from self.stubs.pre_scan()
class MeasureGridStill(MxFlyscanBase):
class MeasureGridStill(AerotechFlyscanBase):
""" Still grid scan
Note: This was probably never used in its current form, because the
@@ -908,27 +841,29 @@ class MeasureGridStill(MxFlyscanBase):
scan_name = "measure_raster_still"
required_kwargs = ["positions", "columns", "exp_time", "delay"]
def __init__(self, *args, parameter: dict = None, **kwargs):
super().__init__(parameter=parameter, **kwargs)
self.axis = ['omega']
# Default values
scan_start_x = None
scan_end_x = None
scan_xstep = 0.010
scan_ystep = 0.010
scan_zstep = 0.010
scan_gmy_step = 0.010
self.scan_command = AbrCmd.RASTER_SCAN_STILL
self.scan_positions = self.caller_kwargs.get("positions")
self.scan_stepnum_x = int(self.caller_kwargs.get("columns"))
def __init__(self, *args, parameter: dict = None, **kwargs):
self.axis = ['omega']
# Set parameters from meaningful inputs
self.abr_command = AbrCmd.RASTER_SCAN_STILL
self.abr_reset = True
self.scan_positions = parameter['kwargs'].get("positions")
self.scan_stepnum_x = parameter['kwargs'].get("columns")
self.scan_stepnum_y = self.scan_positions / self.scan_stepnum_x
self.scan_exp_time = self.caller_kwargs.get("exp_time")
self.scan_delay = self.caller_kwargs.get("delay")
self.scan_exp_time = parameter['kwargs'].get("exp_time")
self.scan_delay = parameter['kwargs'].get("delay")
if len(self.scan_positions)==1:
raise RuntimeWarning("Raster scan with one cell makes no sense")
# Good practice: new members only in __init__
self.scan_start_x = None
self.scan_end_x = None
self.scan_xstep = 0.010
self.scan_ystep = 0.010
self.scan_zstep = 0.010
self.scan_gmy_step = 0.010
# Call base class
super().__init__(parameter=parameter, **kwargs)
def prepare_positions(self):
# Calculate step sizes
@@ -960,14 +895,7 @@ class MeasureGridStill(MxFlyscanBase):
# self._raster_current_row = 0
# self._raster_num_rows = rows
def pre_scan(self):
# ToDo: Move roughly to start position
# Reset the scan engine
yield from self.stubs.send_rpc_and_wait("abr", "reset")
# Configure the global variables
d = {"scan_command" : self.scan_command,
self.abr_globals = {
"var_1" : self.scan_omega_position,
"var_2" : self.scan_start_x,
"var_3" : self.scan_end_x,
@@ -979,14 +907,10 @@ class MeasureGridStill(MxFlyscanBase):
"var_9" : self.scan_gmx_offset,
"var_10" : self.scan_gmy_step,
}
yield from self.stubs.send_rpc_and_wait("abr", "configure", d)
# Call super
yield from self.stubs.pre_scan()
class MeasureJungfrau(MxFlyscanBase):
class MeasureJungfrau(AerotechFlyscanBase):
""" Scan with the Jungfrau detector
The scan itself is executed by the scan service running on the Aerotech
@@ -1015,24 +939,19 @@ class MeasureJungfrau(MxFlyscanBase):
required_kwargs = ["columns", "rows", "width", "height", "exp_time"]
def __init__(self, *args, parameter: dict = None, **kwargs):
super().__init__(parameter=parameter, **kwargs)
self.axis = ['omega']
self.scan_command = AbrCmd.JUNGFRAU
self.scan_num_cols = self.caller_kwargs.get("columns")
self.scan_num_rows = self.caller_kwargs.get("rows")
self.scan_width = self.caller_kwargs.get("width")
self.scan_height = self.caller_kwargs.get("height")
self.scan_exp_time = self.caller_kwargs.get("exp_time")
self.scan_sleep = self.caller_kwargs.get("sleep", 0.1)
self.scan_timeout = None if self.scan_sleep <= 0 else 5.0 + (self.scan_num_rows * self.scan_sleep) + (self.scan_num_cols * self.scan_num_rows * self.scan_exp_time)
def pre_scan(self):
# ToDo: Move roughly to start position
# Configure the global variables
d = {"scan_command" : self.scan_command,
# Set parameters from meaningful inputs
self.scan_num_cols = parameter['kwargs'].get("columns")
self.scan_num_rows = parameter['kwargs'].get("rows")
self.scan_width = parameter['kwargs'].get("width")
self.scan_height = parameter['kwargs'].get("height")
self.scan_exp_time = parameter['kwargs'].get("exp_time")
self.scan_sleep = parameter['kwargs'].get("sleep", 0.1)
self.abr_timeout = None if self.scan_sleep <= 0 else 5.0 + (self.scan_num_rows * self.scan_sleep) + (self.scan_num_cols * self.scan_num_rows * self.scan_exp_time)
self.abr_command = AbrCmd.JUNGFRAU
self.abr_reset = True
self.abr_complete = True
self.abr_globals = {
"var_1" : self.scan_num_cols,
"var_2" : self.scan_num_rows,
"var_3" : self.scan_width,
@@ -1041,23 +960,14 @@ class MeasureJungfrau(MxFlyscanBase):
"var_6" : self.scan_sleep,
"var_7" : 0,
"var_8" : 0,
}
yield from self.stubs.send_rpc_and_wait("abr", "configure", d)
# Call super
yield from self.stubs.pre_scan()
}
# Call base class
super().__init__(parameter=parameter, **kwargs)
def scan_core(self):
""" The actual scan logic comes here.
"""
self.pointID = 0
# Kick off the run
yield from self.stubs.command("abr", "start_command.set", 1)
logger.info("Measurement launched on the ABR stage...")
# Wait for grid scanner to finish
st = yield from self.stubs.send_rpc_and_wait("abr", "complete", self.scan_timeout)
st.wait()
yield from super().scan_core()
# Go back to direct mode
st = yield from self.stubs.send_rpc_and_wait("abr", "set_axis_mode", "direct")