From 13c6d7b8fb91a54fef9c00aeb9275650b49639e2 Mon Sep 17 00:00:00 2001 From: Unknown MX Person Date: Tue, 27 Aug 2024 17:58:33 +0200 Subject: [PATCH] Starting to move things to scans --- pxiii_bec/devices/A3200.py | 222 +++----- pxiii_bec/devices/aeroA3200.py | 58 +- pxiii_bec/scans/mx_measurements.py | 877 +++++++++++++++++++++++++++++ 3 files changed, 1007 insertions(+), 150 deletions(-) create mode 100644 pxiii_bec/scans/mx_measurements.py diff --git a/pxiii_bec/devices/A3200.py b/pxiii_bec/devices/A3200.py index 2788996..9109243 100644 --- a/pxiii_bec/devices/A3200.py +++ b/pxiii_bec/devices/A3200.py @@ -103,16 +103,12 @@ Examples """ import time import math -from beamline import BEAMLINE -from epics import PV, poll -from epics.ca import pend_io - from ophyd import Component, Device, EpicsSignal, EpicsSignalRO, Kind -from ophyd.status import Status +from ophyd.status import MoveStatus, SubscriptionStatus, DeviceStatus -class A3200Axis(PVPositioner): +from aeroA3200 import A3200Axis, A3200RasterScanner, A3200Oscillator ABR_DONE = 0 @@ -131,17 +127,6 @@ SHUTTER_OPEN = 1 FULL_PERIOD = 0 HALF_PERIOD = 1 - -def pv_wait(pv, val, timeout=10.0): - timeout = timeout + time.time() - timeisup = False - while val != pv.get() and not timeisup: - poll(0.01) - timeisup = time.time() > timeout - if timeisup: - raise RuntimeWarning("timeout waiting for %s to reach %s" % (pv.pvname, str(val))) - - CMD_NONE = 0 CMD_RASTER_SCAN_SIMPLE = 1 CMD_MEASURE_STANDARD = 2 @@ -196,9 +181,9 @@ class AerotechAbrStage(Device): gmz = Component(A3200Axis, "-ES-DF1:GMZ", kind=Kind.hinted) - _zcmd = Component(EpicsSignal, "-ES-PSO:CMD", put_complete=True, kind=Kind.omitted) - _start_command = Component(EpicsSignal, "-ES-PSO:START-TEST.PROC", put_complete=True, kind=Kind.omitted) - _stop_command = Component(EpicsSignal, "-ES-PSO:STOP-TEST.PROC", put_complete=True, kind=Kind.omitted) + scan_command = Component(EpicsSignal, "-ES-PSO:CMD", put_complete=True, kind=Kind.omitted) + start_command = Component(EpicsSignal, "-ES-PSO:START-TEST.PROC", put_complete=True, kind=Kind.omitted) + stop_command = Component(EpicsSignal, "-ES-PSO:STOP-TEST.PROC", put_complete=True, kind=Kind.omitted) _var_1 = Component(EpicsSignal, "-ES-PSO:VAR-1", put_complete=True, kind=Kind.omitted) _var_2 = Component(EpicsSignal, "-ES-PSO:VAR-2", put_complete=True, kind=Kind.omitted) @@ -216,23 +201,17 @@ class AerotechAbrStage(Device): raster_num_rows = Component(EpicsSignal, "-ES-GRD:SCAN-DONE", kind=Kind.config) - def __init__(self): + # def __init__(self): + # self.__pv_ostart = PV(f"{BEAMLINE}-ES-OSC:START-POS") + # self.__pv_orange = PV(f"{BEAMLINE}-ES-OSC:RANGE") + # self.__pv_etime = PV(f"{BEAMLINE}-ES-OSC:ETIME") + # self.__pv_get_ready = PV(f"{BEAMLINE}-ES-OSC:READY.PROC") + # self.__pv_start = PV(f"{BEAMLINE}-ES-OSC:START") + # self.__pv_phase = PV(f"{BEAMLINE}-ES-OSC:DONE") - self.__pv_ostart = PV(f"{BEAMLINE}-ES-OSC:START-POS") - self.__pv_orange = PV(f"{BEAMLINE}-ES-OSC:RANGE") - self.__pv_etime = PV(f"{BEAMLINE}-ES-OSC:ETIME") - self.__pv_get_ready = PV(f"{BEAMLINE}-ES-OSC:READY.PROC") - self.__pv_start = PV(f"{BEAMLINE}-ES-OSC:START") - self.__pv_phase = PV(f"{BEAMLINE}-ES-OSC:DONE") - - pend_io(10) + # pend_io(10) - def command(self, cmd): - self._zcmd.set(cmd).wait() - - def start_command(self): - self._start_command.set(1).wait() def measure_standard(self, start, wedge, etime, ready_rate=500.0): """measure a standard wedge from `start` to `start` + `wedge` during `etime` seconds""" @@ -555,19 +534,48 @@ class AerotechAbrStage(Device): self.start_command() + def configure(self, d: dict) -> tuple: - def configure_stills(self, frames, etime, sleep=0.0): - self._var_1.set(frames).wait() - self._var_2.set(etime).wait() - self._var_3.set(sleep).wait() + old = self.read_configuration() + + # ToDo: Check if idle before reconfiguring + + # Set the corresponding global variables + if "scan_command" in d: + self.scan_command.set(d['scan_command']).wait() + if "var_1" in d: + self._var_1.set(d['var_1']).wait() + if "var_2" in d: + self._var_2.set(d['var_2']).wait() + if "var_3" in d: + self._var_3.set(d['var_3']).wait() + if "var_4" in d: + self._var_4.set(d['var_4']).wait() + if "var_5" in d: + self._var_5.set(d['var_5']).wait() + if "var_6" in d: + self._var_6.set(d['var_6']).wait() + if "var_7" in d: + self._var_7.set(d['var_7']).wait() + if "var_8" in d: + self._var_8.set(d['var_8']).wait() + if "var_9" in d: + self._var_9.set(d['var_9']).wait() + if "var_10" in d: + self._var_10.set(d['var_10']).wait() + + new = self.read_configuration() + return old, new + + def complete(self, timeout=None) -> DeviceStatus: + """ ToDo: WTF was this device doing here? Whatever... + """ + return self.raster.complete() def reset(self): self.command(CMD_NONE) self.set_direct_mode() - def raster_2d(self): - pass - def is_ioc_ok(self): return 0 == self.status.get() @@ -742,78 +750,7 @@ class AerotechAbrStage(Device): print(" --- trying start again.") self.osc.taskStart.set(1).wait() - def velo(self, axis: int, speed: float = None) -> float: - """set's the speed on the given axis to speed""" - if AXIS_GMX == axis: - velo_pv = self.__pv_gmx_velo - min, max = 0.01, 100.0 - elif AXIS_GMY == axis: - velo_pv = self.__pv_gmy_velo - min, max = 0.01, 10.0 - elif AXIS_GMZ == axis: - velo_pv = self.__pv_gmz_velo - min, max = 0.01, 10.0 - elif AXIS_OMEGA == axis: - velo_pv = self.__pv_omega_velo - min, max = 0.01, 720.0 - else: - raise Exception("unknown axis index") - - if speed is not None: - if not (min <= speed <= max): - raise Exception(f"requested speed {speed} outside range {min} < speed < {max}") - velo_pv.put(speed) - poll(0.05) - return velo_pv.get() - - - def select_axis(self, axis): - """returns PVs and limits for the given axis - - axis: Aerotech.AXIS_{OMEGA=1, GMX=2, GMY=3, GMZ=4} - - return (val_pv, rbv_pv, velo_pv, done_pv, velomin, velomax) - """ - - if AXIS_GMX == axis: - val_pv = self.__pv_gmx_val - rbv_pv = self.__pv_gmx_rbv - velo_pv = self.__pv_gmx_velo - done_pv = self.__pv_gmx_done - min, max = 0.01, 100.0 - elif AXIS_GMY == axis: - val_pv = self.__pv_gmy_val - rbv_pv = self.__pv_gmy_rbv - velo_pv = self.__pv_gmy_velo - done_pv = self.__pv_gmy_done - min, max = 0.01, 10.0 - elif AXIS_GMZ == axis: - val_pv = self.__pv_gmz_val - rbv_pv = self.__pv_gmz_rbv - velo_pv = self.__pv_gmz_velo - done_pv = self.__pv_gmz_done - min, max = 0.01, 10.0 - elif AXIS_OMEGA == axis: - val_pv = self.__pv_omega_set - rbv_pv = self.__pv_omega_get - velo_pv = self.__pv_omega_velo - done_pv = self.__pv_omega_done - min, max = 0.01, 720.0 - else: - raise Exception("unknown axis index") - - return (val_pv, rbv_pv, velo_pv, done_pv, min, max) - - - - def rbv(self): - x = self.__pv_gmx_rbv.get() - y = self.__pv_gmy_get.get() - z = self.__pv_gmz_get.get() - omega = self.__pv_omega_get.get() - return {"gmx": x, "gmy": y, "gmz": z, "omega": omega} - - def move(self, position, wait=False, velo=None, direct=False, **kwargs) -> StatusBase: + def move(self, position, wait=False, velocity=None, relative=None, direct=False, **kwargs) -> MoveStatus: """Move omega to `angle` at `speed` deg/s and `wait` (or not). If a `speed` is specified, it will be used for the movement but the speed @@ -832,10 +769,10 @@ class AerotechAbrStage(Device): Limits: 0 - 360 `wait` [boolean] wait or not for movement to finish. """ - return self.omega.move(position, wait=wait, velo=velo, direct=direct, **kwargs) + return self.omega.move(position, wait=wait, velocity=velocity, relative=relative, direct=direct, **kwargs) def set_shutter(self, state): - if self.__pv_axes_mode.get(): + if self.axisAxesMode.get(): print("ABR is not in direct mode; cannot manipulate shutter") return False state = str(state).lower() @@ -846,39 +783,42 @@ class AerotechAbrStage(Device): state = 1 elif state == ["0", "closed"]: state = 0 - self.__pv_shutter_set.put(state, wait=True) - return state == self.__pv_shutter_get.get() + self._shutter.put(state, wait=True) + return state == self._shutter.get() if __name__ == "__main__": - import argparse - from time import sleep + aero = AerotechAbrStage(prefix="X06DA-ES-DF1", name="abr") + aero.wait_for_connection() - parser = argparse.ArgumentParser(description="aerotech client") - parser.add_argument("--stop", help="stop/reset/restart aerotech", action="store_true") - parser.add_argument("--sanitize-speeds", help="reset speeds on axis to sane defaults", action="store_true") - parser.add_argument("--measure-standard", help="measure a standard dataset", action="store") - parser.add_argument("--omega", help="move omega to...", action="store") - parser.add_argument("--gmx", help="move gmx to...", action="store") - parser.add_argument("--gmy", help="move gmy to...", action="store") - parser.add_argument("--gmz", help="move gmz to...", action="store") - args = parser.parse_args() + # import argparse + # from time import sleep - abr = Abr() - if args.stop: - abr.stop() + # parser = argparse.ArgumentParser(description="aerotech client") + # parser.add_argument("--stop", help="stop/reset/restart aerotech", action="store_true") + # parser.add_argument("--sanitize-speeds", help="reset speeds on axis to sane defaults", action="store_true") + # parser.add_argument("--measure-standard", help="measure a standard dataset", action="store") + # parser.add_argument("--omega", help="move omega to...", action="store") + # parser.add_argument("--gmx", help="move gmx to...", action="store") + # parser.add_argument("--gmy", help="move gmy to...", action="store") + # parser.add_argument("--gmz", help="move gmz to...", action="store") + # args = parser.parse_args() + + # abr = Abr() + # if args.stop: + # abr.stop() - elif args.sanitize_speeds: - abr.velo(AXIS_GMX, 50.0) - abr.velo(AXIS_GMY, 10.0) - abr.velo(AXIS_GMZ, 10.0) - abr.velo(AXIS_OMEGA, 180.0) + # elif args.sanitize_speeds: + # abr.velo(AXIS_GMX, 50.0) + # abr.velo(AXIS_GMY, 10.0) + # abr.velo(AXIS_GMZ, 10.0) + # abr.velo(AXIS_OMEGA, 180.0) - elif args.gmx: - abr.move_x(args.gmx) + # elif args.gmx: + # abr.move_x(args.gmx) - elif args.measure_standard: - start_angle, total_range, total_time = args.measure_standard.split(",") - abr.measure_standard(start_angle, total_range, total_time) + # elif args.measure_standard: + # start_angle, total_range, total_time = args.measure_standard.split(",") + # abr.measure_standard(start_angle, total_range, total_time) diff --git a/pxiii_bec/devices/aeroA3200.py b/pxiii_bec/devices/aeroA3200.py index 18ff5b7..11a2bcd 100644 --- a/pxiii_bec/devices/aeroA3200.py +++ b/pxiii_bec/devices/aeroA3200.py @@ -5,8 +5,19 @@ Created on Tue Jun 11 11:28:38 2024 @author: mohacsi_i """ -from ophyd import Component, PVPositioner, Device, EpicsSignal, EpicsSignalRO, Kind -from ophyd.status import Status +from ophyd import Component, PVPositioner, Device, Signal, EpicsSignal, EpicsSignalRO, Kind +from ophyd.status import Status, MoveStatus, SubscriptionStatus, DeviceStatus + + +ABR_DONE = 0 +ABR_READY = 1 +ABR_BUSY = 2 + +GRID_SCAN_BUSY = 0 +GRID_SCAN_DONE = 1 + +DIRECT_MODE = 0 +MEASURING_MODE = 1 class A3200Axis(PVPositioner): @@ -21,7 +32,7 @@ class A3200Axis(PVPositioner): setpoint = Component(EpicsSignal, "-SETP", kind=Kind.config) #setpoint = Component(EpicsSignal, "-VAL", kind=Kind.config) - velo = Component(EpicsSignal, "-SETV", kind=Kind.config) + velocity = Component(EpicsSignal, "-SETV", kind=Kind.config) status = Component(EpicsSignalRO, "-STAT", auto_monitor=True, kind=Kind.config) # PV to issue native relative movements on the A3200 relmove = Component(EpicsSignal, "-INCP", put_complete=True, kind=Kind.config) @@ -30,9 +41,9 @@ class A3200Axis(PVPositioner): ishomed = Component(EpicsSignal, "-AS00", kind=Kind.config) # HW status words - dshw = Component(EpicsSignalRO, "-DSHW", auto_monitor=True, kind=Kind.hinted) - ashw = Component(EpicsSignalRO, "-ASHW", auto_monitor=True, kind=Kind.hinted) - fslw = Component(EpicsSignalRO, "-FSLW", auto_monitor=True, kind=Kind.hinted) + dshw = Component(EpicsSignalRO, "-DSHW", auto_monitor=True, kind=Kind.normal) + ashw = Component(EpicsSignalRO, "-ASHW", auto_monitor=True, kind=Kind.normal) + fslw = Component(EpicsSignalRO, "-FSLW", auto_monitor=True, kind=Kind.normal) # Rock movement _rock = Component(EpicsSignal, "-ROCK", put_complete=True, kind=Kind.config) @@ -56,9 +67,12 @@ class A3200Axis(PVPositioner): self.vmax.set(vmax).wait() - def move(self, position, wait=True, velo=None, direct=False, **kwargs): + def move(self, position, velocity=None, wait=True, relative=False, direct=False, **kwargs) -> MoveStatus: """ Native absolute movement on the A3200 """ + # ToDo: Ensure we can stop (if a stop_signal is configured). + + # Check if we're in direct movement mode if self.parent is not None: if self.parent.mode != DIRECT_MODE: if direct: @@ -66,8 +80,13 @@ class A3200Axis(PVPositioner): else: raise RuntimeError("ABR is not in direct mode!") - if velo is not None: - self.velo.set(velo).wait() + # Set velocity if provided + if velocity is not None: + self.velo.set(velocity).wait() + + # Set up relative motion + if relative: + raise NotImplementedError("Relative movement is not yet supported by the ophyd device") return super().move(position, wait=wait, **kwargs) @@ -299,6 +318,27 @@ class A3200RasterScanner(Device): return status + def complete(self, timeout=None) -> DeviceStatus: + if timeout is not None: + # Define wait until the busy flag goes down (excluding initial update) + timestamp_ = 0 + def isReady(*args, old_value, value, timestamp, **kwargs): + nonlocal timestamp_ + result = False if (timestamp_== 0) else bool(value==GRID_SCAN_DONE) + print(f"Old {old_value}\tNew: {value}\tResult: {result}") + timestamp_ = timestamp + return result + + # Subscribe and wait for update + status = SubscriptionStatus(self.scan_done, isReady, timeout=timeout, settle_time=0.5) + return status + status = DeviceStatus(self, settle_time=0.5) + status.set_finished() + return status + + + + diff --git a/pxiii_bec/scans/mx_measurements.py b/pxiii_bec/scans/mx_measurements.py new file mode 100644 index 0000000..5208f8f --- /dev/null +++ b/pxiii_bec/scans/mx_measurements.py @@ -0,0 +1,877 @@ + ++ 119 +− 0 +import time + +import numpy as np + +from bec_lib import bec_logger +from bec_server.scan_server.scans import AsyncFlyScanBase, ScanArgType, ScanBase + +logger = bec_logger.logger + + +CMD_NONE = 0 +CMD_RASTER_SCAN_SIMPLE = 1 +CMD_MEASURE_STANDARD = 2 +CMD_VERTICAL_LINE_SCAN = 3 +CMD_SCREENING = 4 +CMD_SUPER_FAST_OMEGA = 5 +CMD_STILL_WEDGE = 6 +CMD_STILLS = 7 +CMD_REPEAT_SINGLE_OSCILLATION = 8 +CMD_SINGLE_OSCILLATION = 9 +CMD_OLD_FASHIONED = 10 +CMD_RASTER_SCAN = 11 +CMD_JET_ROTATION = 12 +CMD_X_HELICAL = 13 +CMD_X_RUNSEQ = 14 +CMD_JUNGFRAU = 15 +CMD_MSOX = 16 +CMD_SLIT_SCAN = 17 +CMD_RASTER_SCAN_STILL = 18 +CMD_SCAN_SASTT = 19 +CMD_SCAN_SASTT_V2 = 20 +CMD_SCAN_SASTT_V3 = 21 + + +class MxFlyscanBase(AsyncFlyScanBase): + """ Base class for MX flyscans + """ + scan_report_hint = "table" + arg_input = {} + arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None} + + def __init__(self, *args, parameter: dict = None, **kwargs): + """ Just set num_pos=0 to avoid hanging + """ + self.num_pos = 0 + super().__init__(parameter=parameter, **kwargs) + + def stage(self): + """ ToDo: Sot sure if we should call super() here as it'd stage the whole beamline""" + return super().stage() + + def scan_core(self): + """ The actual scan logic comes here. + """ + self.pointID = 0 + # Kick off the run + yield from self.stubs.command("abr", "start_command.set", 1) + + logger.info("Measurement launched on the ABR stage...") + # ToDo: Wait for scan to finish + + def cleanup(self): + """Set scan progress to 1 to finish the scan""" + self.num_pos = 1 + return super().cleanup() + + + + + + +class MeasureStandardWedge(MxFlyscanBase): + """ Measure a standard wedge scan + """ + scan_name = "standard_wedge" + required_kwargs = ["start", "range"] + + def __init__(self, *args, parameter: dict = None, **kwargs): + """ Standard measurement scan + + Measure a standard wedge from `start` to `start` + `wedge` during + `etime` on the Omega axis. + + The scan itself is executed by the scan service running on the Aerotech + controller. Ophyd just configures, launches it and waits for completion. + + Example + ------- + >>> scans.standard_wedge(start=42, range=10, scan_time=20) + + Parameters + ---------- + start : (float, float) + Scan start position of the axis. + range : (float, float) + Scan range of the axis. + scan_time : (float) + Total scan time for the movement [s]. + ready_rate : float, optional + No clue what is this... (default=500) + """ + super().__init__(parameter=parameter, **kwargs) + self.axis = ['omega'] + + self.scan_command = CMD_MEASURE_STANDARD + self.scan_start = self.caller_kwargs.get("start") + self.scan_range = self.caller_kwargs.get("range") + self.scan_scan_time = float(self.caller_kwargs.get("scan_time", 5)) + self.scan_ready_rate = float(self.caller_kwargs.get("ready_rate", 500)) + + def pre_scan(self): + # ToDo: Move roughly to start position + + # Set the global variables + yield from self.stubs.command("abr", "scan_command.set", self.scan_command) + yield from self.stubs.command("abr", "_var_1.set", self.scan_start) + yield from self.stubs.command("abr", "_var_2.set", self.scan_range) + yield from self.stubs.command("abr", "_var_3.set", self.scan_scan_time) + yield from self.stubs.command("abr", "_var_4.set", self.scan_ready_rate) + + # Call super + yield from self.stubs.pre_scan() + + +class MeasureScanSlits(MxFlyscanBase): + """ Measure a slit scan + """ + scan_name = "slit_scan" + required_kwargs = ["start", "range"] + + def __init__(self, *args, parameter: dict = None, **kwargs): + """ Slit scan + + Measure a standard slit scan. + + The scan itself is executed by the scan service running on the Aerotech + controller. Ophyd just configures, launches it and waits for completion. + + Example + ------- + >>> scans.slit_scan(start=42, range=10, scan_time=20) + + Parameters + ---------- + delta_x : (float, float) + Scan range with slit in x. + delta_y : (float, float) + Scan range with slit in y. + velocity : (float) + Scan velocity [mm/s]]. + """ + super().__init__(parameter=parameter, **kwargs) + self.axis = [] + + self.scan_command = CMD_SLIT_SCAN + self.scan_delta_x = self.caller_kwargs.get("delta_x") + self.scan_delta_y = self.caller_kwargs.get("delta_y") + self.scan_velocity = self.caller_kwargs.get("velocity") + + def pre_scan(self): + # ToDo: Move roughly to start position + + # Set the global variables + yield from self.stubs.command("abr", "scan_command.set", self.scan_command) + yield from self.stubs.command("abr", "_var_1.set", self.scan_delta_x) + yield from self.stubs.command("abr", "_var_2.set", self.scan_delta_y) + yield from self.stubs.command("abr", "_var_3.set", self.scan_velocity) + + # Call super + yield from self.stubs.pre_scan() + + +class MeasureRasterSimple(MxFlyscanBase): + """ Measure a grid scan + """ + scan_name = "raster_simple" + required_kwargs = ["exp_time", "cell_width", "cell_height", "ncols", "nrows"] + + def __init__(self, *args, parameter: dict = None, **kwargs): + """ Simnple raster scan + + Measure a simplified raster scan, assuming the goniometer is currently + at the CENTER of TOP-LEFT CELL. + + The scan itself is executed by the scan service running on the Aerotech + controller. Ophyd just configures, launches it and waits for completion. + + Example + ------- + >>> scans.raster_simple(exp_time=0.5, cell_width=0.2, cell_height=0.2, ncols=10, nrows=10) + + Parameters + ---------- + exp_time : (float) + Exposure time per point [s]. + cell_width : (float) + Scan step size [mm]. + cell_height : (float) + Scan step size [mm]. + ncols : (int) + Number of scan steps. + nrows : (int) + Number of scan steps. + """ + super().__init__(parameter=parameter, **kwargs) + self.axis = [] + + self.scan_command = CMD_RASTER_SCAN_SIMPLE + self.scan_exp_time = self.caller_kwargs.get("exp_time") + self.scan_stepsize_x = self.caller_kwargs.get("cell_width") + self.scan_stepsize_y = self.caller_kwargs.get("cell_height") + self.scan_stepnum_x = int(self.caller_kwargs.get("ncols")) + self.scan_stepnum_y = int(self.caller_kwargs.get("nrows")) + + def pre_scan(self): + # ToDo: Move roughly to start position + + # Set the global variables + yield from self.stubs.command("abr", "raster_scan_done.set", 0) + yield from self.stubs.command("abr", "scan_command.set", self.scan_command) + yield from self.stubs.command("abr", "_var_1.set", self.scan_exp_time) + yield from self.stubs.command("abr", "_var_2.set", self.scan_stepsize_x) + yield from self.stubs.command("abr", "_var_3.set", self.scan_stepsize_y) + yield from self.stubs.command("abr", "_var_4.set", self.scan_stepnum_x) + yield from self.stubs.command("abr", "_var_5.set", self.scan_stepnum_y) + yield from self.stubs.command("abr", "_var_6.set", 0) + yield from self.stubs.command("abr", "_var_7.set", 0) + yield from self.stubs.command("abr", "_var_8.set", 0) + + # ToDo: This line was left out + #self.raster._raster_num_rows = nrows + + # Call super + yield from self.stubs.pre_scan() + + +class MeasureSASTT(MxFlyscanBase): + """ Measure a small angle scanning tensor tomography scan + """ + scan_name = "measure_sastt" + required_kwargs = ["exp_time", "cell_width", "cell_height", "ncols", "nrows"] + + def __init__(self, *args, parameter: dict = None, **kwargs): + """ Standard measurement scan + + Measure a simplified grid scan, assuming the goniometer is currently + at the CENTER of the required grid. + + The scan itself is executed by the scan service running on the Aerotech + controller. Ophyd just configures, launches it and waits for completion. + + Example + ------- + >>> scans.measure_sastt(exp_time=1.0, cell_width=0.01, cell_height=0.01, ncols=100, nrows=100) + + Parameters + ---------- + exp_time : (float) + Exposure time per point [s]. + cell_width : (float) + Scan step size [mm]. + cell_height : (float) + Scan step size [mm]. + ncols : (int) + Number of scan steps. + nrows : (int) + Number of scan steps. + odd_tweak : (float) + Distance to be added before the open shutter command [mm]. + Used only in scan version=3. Should be small (default: 0)! + even_tweak : (float) + Distance to be added before the open shutter command [mm]. + Used only in scan version=3. Should be small (default: 0)! + version : (int) + Scanning mode for tensor tomo. + 1 = original snake scan, single PSO window + 2 = scan always from LEFT---RIGHT + 3 = snake scan alternating PSO window for even/odd rows + """ + super().__init__(parameter=parameter, **kwargs) + self.axis = [] + + self.scan_version = int(self.caller_kwargs.get("version", 1)) + if self.scan_version==1: + self.scan_command = CMD_SCAN_SASTT + if self.scan_version==2: + self.scan_command = CMD_SCAN_SASTT_V2 + if self.scan_version==3: + self.scan_command = CMD_SCAN_SASTT_V3 + else: + raise RuntimeError(f"Non existing SAS-TT scan mode: {self.scan_version}") + + self.scan_exp_time = self.caller_kwargs.get("exp_time") + self.scan_stepsize_x = self.caller_kwargs.get("cell_width") + self.scan_stepsize_y = self.caller_kwargs.get("cell_height") + self.scan_stepnum_x = int(self.caller_kwargs.get("ncols")) + self.scan_stepnum_y = int(self.caller_kwargs.get("nrows")) + self.scan_even_tweak = self.caller_kwargs.get("even_tweak", 0) + self.scan_odd_tweak = self.caller_kwargs.get("off_tweak", 0) + + def pre_scan(self): + # ToDo: Move roughly to start position + + # Set the global variables + yield from self.stubs.command("abr", "raster_scan_done.set", 0) + yield from self.stubs.command("abr", "scan_command.set", self.scan_command) + yield from self.stubs.command("abr", "_var_1.set", self.scan_exp_time) + yield from self.stubs.command("abr", "_var_2.set", self.scan_stepsize_x) + yield from self.stubs.command("abr", "_var_3.set", self.scan_stepsize_y) + yield from self.stubs.command("abr", "_var_4.set", self.scan_stepnum_x) + yield from self.stubs.command("abr", "_var_5.set", self.scan_stepnum_y) + yield from self.stubs.command("abr", "_var_6.set", self.scan_even_tweak) + yield from self.stubs.command("abr", "_var_7.set", self.scan_odd_tweak) + yield from self.stubs.command("abr", "_var_8.set", 0) + + # ToDo: This line was left out + # self.raster._raster_num_rows = nrows + + # Call super + yield from self.stubs.pre_scan() + + +class MeasureVerticalLine(MxFlyscanBase): + """ Measure a vertical line using the GMY motor + + Simple fly scan that performs a single scan vertically. + The line is nummCells * cellHeight long and the exposureTime is per cell. + + The scan itself is executed by the scan service running on the Aerotech + controller. Ophyd just configures, launches it and waits for completion. + + Example + ------- + >>> scans.measure_vertical_line(start=42, range=10, scan_time=20) + + Parameters + ---------- + cell_height : (float) + Step size [mm]. + nrows : (int) + Scan range of the axis. + exp_time : (float) + Exposure time per cell [s] + """ + scan_name = "measure_vertical_line" + required_kwargs = ["exp_time", "cell_height", "nrows"] + + def __init__(self, *args, parameter: dict = None, **kwargs): + + super().__init__(parameter=parameter, **kwargs) + self.axis = ['gmy'] + + self.scan_command = CMD_VERTICAL_LINE_SCAN + self.scan_exp_time = self.caller_kwargs.get("exp_time") + self.scan_stepsize_y = self.caller_kwargs.get("cell_height") + self.scan_stepnum_y = int(self.caller_kwargs.get("nrows")) + + def pre_scan(self): + # ToDo: Move roughly to start position + + # Set the global variables + yield from self.stubs.command("abr", "scan_command.set", self.scan_command) + yield from self.stubs.command("abr", "_var_1.set", self.scan_stepsize_y) + yield from self.stubs.command("abr", "_var_2.set", self.scan_stepnum_y) + yield from self.stubs.command("abr", "_var_3.set", self.scan_exp_time) + + # Call super + yield from self.stubs.pre_scan() + + +class MeasureScreening(MxFlyscanBase): + """ Measure a sample screening program + + The scan itself is executed by the scan service running on the Aerotech + controller. Ophyd just configures, launches it and waits for completion. + + Example + ------- + >>> scans.measure_screening(start=42, range=10, scan_time=20, degrees=180, frames=1800) + + Parameters + ---------- + start : (float) + Scan start position of the axis. + range : (float) + Scan range of the axis. + scan_time : (float) + Total scan time for the movement [s]. + degrees : (???) + frames : (???) + delta : (???) + (default: 0.5). + + """ + scan_name = "measure_screening" + required_kwargs = ["start", "range", "scan_time", "degrees", "frames"] + + def __init__(self, *args, parameter: dict = None, **kwargs): + + super().__init__(parameter=parameter, **kwargs) + self.axis = ['gmy'] + + self.scan_command = CMD_SCREENING + self.scan_start = self.caller_kwargs.get("start") + self.scan_range = self.caller_kwargs.get("range") + self.scan_scan_time = self.caller_kwargs.get("scan_time") + self.scan_degrees = self.caller_kwargs.get("degrees") + self.scan_frames = self.caller_kwargs.get("frames") + self.scan_delta = self.caller_kwargs.get("delta", 0.5) + + def pre_scan(self): + # ToDo: Move roughly to start position + + # Set the global variables + yield from self.stubs.command("abr", "scan_command.set", self.scan_command) + yield from self.stubs.command("abr", "_var_1.set", self.scan_start) + yield from self.stubs.command("abr", "_var_2.set", self.scan_range) + yield from self.stubs.command("abr", "_var_3.set", self.scan_scan_time) + yield from self.stubs.command("abr", "_var_4.set", self.scan_degrees) + yield from self.stubs.command("abr", "_var_5.set", self.scan_frames) + yield from self.stubs.command("abr", "_var_6.set", self.scan_delta) + + + # Call super + yield from self.stubs.pre_scan() + + + + + +class MeasureFastOmega(MxFlyscanBase): + """ Measure a fast omega scan + + MEasures a fast rotational scan with the rotation stage (omega). + + The scan itself is executed by the scan service running on the Aerotech + controller. Ophyd just configures, launches it and waits for completion. + + Example + ------- + >>> scans.measure_fast_omega(start=42, range=180, scan_time=20) + + Parameters + ---------- + start : (float) + Scan start position of the axis. + range : (float) + Scan range of the axis. + scan_time : (float) + Total scan time for the movement [s]. + rate : (???) + (default: 200). + """ + scan_name = "measure_fast_omega" + required_kwargs = ["start", "range", "exp_time"] + + def __init__(self, *args, parameter: dict = None, **kwargs): + + super().__init__(parameter=parameter, **kwargs) + self.axis = ['omega'] + + self.scan_command = CMD_SUPER_FAST_OMEGA + self.scan_start = self.caller_kwargs.get("start") + self.scan_range = self.caller_kwargs.get("range") + self.scan_scan_time = self.caller_kwargs.get("scan_time") + self.scan_rate = self.caller_kwargs.get("rate", 200) + + def pre_scan(self): + # ToDo: Move roughly to start position + + # Set the global variables + yield from self.stubs.command("abr", "scan_command.set", self.scan_command) + yield from self.stubs.command("abr", "_var_1.set", self.scan_start) + yield from self.stubs.command("abr", "_var_2.set", self.scan_range) + yield from self.stubs.command("abr", "_var_3.set", self.scan_scan_time) + yield from self.stubs.command("abr", "_var_4.set", self.scan_rate) + + # Call super + yield from self.stubs.pre_scan() + + + + +class MeasureStillWedge(MxFlyscanBase): + """ Measure a still wedge scan + + The scan itself is executed by the scan service running on the Aerotech + controller. Ophyd just configures, launches it and waits for completion. + + Example + ------- + >>> scans.measure_still_wedge(start=42, range=90, scan_time=20, oscrange=15) + + Parameters + ---------- + start : (float) + Scan start position of the axis. + range : (float) + Scan range of the axis. + scan_time : (float) + Total scan time for the movement [s]. + oscrange : (float) + sleep_after_shutter_close : (float) + [s] (default: 0.01). + """ + scan_name = "measure_still_wedge" + required_kwargs = ["start", "range", "exp_time", "oscrange"] + + def __init__(self, *args, parameter: dict = None, **kwargs): + + super().__init__(parameter=parameter, **kwargs) + self.axis = ['omega'] + + self.scan_command = CMD_STILL_WEDGE + self.scan_start = self.caller_kwargs.get("start") + self.scan_range = self.caller_kwargs.get("range") + self.scan_scan_time = self.caller_kwargs.get("scan_time") + self.scan_oscrange = self.caller_kwargs.get("oscrange") + self.scan_shutter_sleep = self.caller_kwargs.get("sleep_after_shutter_close", 0.01) + + def pre_scan(self): + # ToDo: Move roughly to start position + + # Set the global variables + yield from self.stubs.command("abr", "scan_command.set", self.scan_command) + yield from self.stubs.command("abr", "_var_1.set", self.scan_start) + yield from self.stubs.command("abr", "_var_2.set", self.scan_range) + yield from self.stubs.command("abr", "_var_3.set", self.scan_scan_time) + yield from self.stubs.command("abr", "_var_4.set", self.scan_oscrange) + yield from self.stubs.command("abr", "_var_5.set", self.scan_shutter_sleep) + + # Call super + yield from self.stubs.pre_scan() + + + + + + +class MeasureStills(MxFlyscanBase): + """ Measure a still image sequence + + Measures a series of still images without any motor movement. + IMPORTANT: use idle_time=0.0 to prevent shutter open/close + + The scan itself is executed by the scan service running on the Aerotech + controller. Ophyd just configures, launches it and waits for completion. + + Example + ------- + >>> scans.measure_stils(nframes=100, exp_time=0.1, idle_time=30) + + Parameters + ---------- + nframes : (int) + Total number of frames to be recorded. + exposure_time : (float) + Exposure time of each frame [s]. + idle_time : (float) + Sleep time between the frames [s]. + """ + scan_name = "measure_stils" + required_kwargs = ["exp_time", "nframes"] + + def __init__(self, *args, parameter: dict = None, **kwargs): + + super().__init__(parameter=parameter, **kwargs) + self.axis = [] + + self.scan_command = CMD_STILLS + self.scan_exp_time = self.caller_kwargs.get("exp_time") + self.scan_num_frames = int(self.caller_kwargs.get("nframes")) + self.scan_idle_time = self.caller_kwargs.get("idle_time") + + def pre_scan(self): + # ToDo: Move roughly to start position + + # Set the global variables + yield from self.stubs.command("abr", "scan_command.set", self.scan_command) + yield from self.stubs.command("abr", "_var_1.set", self.scan_num_frames) + yield from self.stubs.command("abr", "_var_2.set", self.scan_exp_time) + yield from self.stubs.command("abr", "_var_3.set", self.scan_idle_time) + + # Call super + yield from self.stubs.pre_scan() + + + + + + +class MeasureSingleOscillation(MxFlyscanBase): + """ Measure a single oscillation + + The scan itself is executed by the scan service running on the Aerotech + controller. Ophyd just configures, launches it and waits for completion. + + Example + ------- + >>> scans.measure_single_osc(start=42, range=180, scan_time=20) + + Parameters + ---------- + start : (float) + Scan start position of the axis. + range : (float) + Oscillation range of the axis. + scan_time : (float) + Total scan time for the movement [s]. + delta : (???) + (default: 0). + settle : (???) + (default: 0.5). + """ + scan_name = "measure_single_osc" + required_kwargs = ["start", "range", "scan_time"] + + def __init__(self, *args, parameter: dict = None, **kwargs): + + super().__init__(parameter=parameter, **kwargs) + self.axis = ['omega'] + + self.scan_command = CMD_SINGLE_OSCILLATION + self.scan_start = self.caller_kwargs.get("start") + self.scan_range = self.caller_kwargs.get("range") + self.scan_scan_time = self.caller_kwargs.get("scan_time") + self.scan_delta = self.caller_kwargs.get("delta", 0) + self.scan_settle = self.caller_kwargs.get("settle", 0.5) + + def pre_scan(self): + # ToDo: Move roughly to start position + + # Set the global variables + yield from self.stubs.command("abr", "scan_command.set", self.scan_command) + yield from self.stubs.command("abr", "_var_1.set", self.scan_start) + yield from self.stubs.command("abr", "_var_2.set", self.scan_range) + yield from self.stubs.command("abr", "_var_3.set", self.scan_scan_time) + yield from self.stubs.command("abr", "_var_4.set", self.scan_delta) + yield from self.stubs.command("abr", "_var_5.set", self.scan_settle) + + # Call super + yield from self.stubs.pre_scan() + + + + + +class MeasureRepeatedOscillation(MxFlyscanBase): + """ Measure oscillation repeatedly + + The scan itself is executed by the scan service running on the Aerotech + controller. Ophyd just configures, launches it and waits for completion. + + Example + ------- + >>> scans.measure_multi_osc(start=42, range=180, scan_time=20) + + Parameters + ---------- + nframes : (int) + ??? + scan_time : (float) + Total scan time for the movement [s]. + range : (float) + Oscillation range of the axis. + settle : (???) + (default: 0). + delta : (???) + (default: 0.5). + + """ + scan_name = "measure_multi_osc" + required_kwargs = ["start", "range", "scan_time"] + + def __init__(self, *args, parameter: dict = None, **kwargs): + + super().__init__(parameter=parameter, **kwargs) + self.axis = ['omega'] + + self.scan_command = CMD_REPEAT_SINGLE_OSCILLATION + self.scan_num_frames = self.caller_kwargs.get("nframes") + self.scan_range = self.caller_kwargs.get("range") + self.scan_scan_time = self.caller_kwargs.get("scan_time") + self.scan_sleep = self.caller_kwargs.get("sleep", 0) + self.scan_delta = self.caller_kwargs.get("delta", 0.5) + + def pre_scan(self): + # ToDo: Move roughly to start position + + # Configure the global variables + d = {"scan_command" : self.scan_command, + "var_1" : self.scan_num_frames, + "var_2" : self.scan_scan_time, + "var_3" : self.scan_range, + "var_4" : self.scan_sleep, + "var_5" : self.scan_delta, + } + yield from self.stubs.send_rpc_and_wait("abr", "configure", d) + + # Call super + yield from self.stubs.pre_scan() + + + + + +class MeasureMSOX(MxFlyscanBase): + """ Standard MSOX scan + + The scan itself is executed by the scan service running on the Aerotech + controller. Ophyd just configures, launches it and waits for completion. + + Example + ------- + >>> scans.measure_msox(start=42, range=10, exp_time=0.1) + + Parameters + ---------- + start : (float, float) + Scan start position of the axis. + range : (float, float) + Scan range of the axis. + exp_time : (float) + Exposure time for each point [s]. + num_datasets : int + ??? + rest_time : float + ??? + """ + scan_name = "measure_msox" + required_kwargs = ["start", "range", "exp_time"] + + def __init__(self, *args, parameter: dict = None, **kwargs): + + super().__init__(parameter=parameter, **kwargs) + self.axis = ['omega'] + + self.scan_command = CMD_MSOX + self.scan_start = self.caller_kwargs.get("start") + self.scan_range = self.caller_kwargs.get("range") + self.scan_exp_time = self.caller_kwargs.get("exp_time") + self.scan_num_datasets = int(self.caller_kwargs.get("num_datasets", 1)) + self.scan_rest_time = self.caller_kwargs.get("rest_time", 0) + + + def pre_scan(self): + # ToDo: Move roughly to start position + + # Reset the scan engine + yield from self.stubs.send_rpc_and_wait("abr", "reset") + + # Configure the global variables + d = {"scan_command" : self.scan_command, + "var_1" : self.scan_start, + "var_2" : self.scan_range, + "var_3" : self.scan_exp_time, + "var_4" : self.scan_num_datasets, + "var_5" : self.scan_rest_time, + } + yield from self.stubs.send_rpc_and_wait("abr", "configure", d) + + # Call super + yield from self.stubs.pre_scan() + + + + + + + +class MeasureGrid(MxFlyscanBase): + """ General grid scan + + Note: This was probably never used in its current form, because the + code had an undefined variable 'self.position' + + The scan itself is executed by the scan service running on the Aerotech + controller. Ophyd just configures, launches it and waits for completion. + + Example + ------- + >>> scans.measure_msox(start=42, range=10, exp_time=0.1) + + Parameters + ---------- + positions : 2D-array + Scan position of each axis, i.e. (N, 3) shaped array. + ncols : int + Nmber of columns. + angle: + ??? + exp_time : (float) + Exposure time for each point [s]. + """ + scan_name = "measure_raster" + required_kwargs = ["positions", "ncols", "angle", "exp_time"] + + def __init__(self, *args, parameter: dict = None, **kwargs): + + super().__init__(parameter=parameter, **kwargs) + self.axis = ['omega'] + + self.scan_command = CMD_RASTER_SCAN + self.scan_positions = self.caller_kwargs.get("positions") + self.scan_stepnum_x = int(self.caller_kwargs.get("ncols")) + self.scan_stepnum_y = self.scan_positions / self.scan_stepnum_x + self.scan_angle = self.caller_kwargs.get("angle") + self.scan_exp_time = self.caller_kwargs.get("exp_time") + + if len(self.scan_positions)==1: + raise RuntimeWarning("Raster scan with one cell makes no sense") + + # Good practice: new members only in __init__ + self.scan_ystep = 0.010 + self.scan_zstep = 0.010 + self.scan_gmy_step = 0.010 + + + def prepare_positions(self): + + # Calculate step sizes + pos_start = self.scan_positions[0] # first cell on first row + pos_col_2nd = self.scan_positions[1] # second cell on first row + pos_col_end = self.scan_positions[self.scan_stepnum_x-1] # last cell on first row + + self.scan_xstep = pos_col_2nd[0] - pos_start[0] + half_cell = abs(self.scan_xstep) / 2.0 + self.scan_start_x = pos_start[0] - half_cell + self.scan_end_x = pos_col_end[0] + half_cell + + if self.scan_stepnum_y > 1: + pos_row_2nd = self.scan_positions[self.scan_stepnum_x] # first cell on second row + self.scan_ystep = pos_row_2nd[1] - pos_start[1] + self.scan_zstep = pos_row_2nd[2] - pos_start[2] + self.scan_gmy_step = np.linalg.norm([self.scan_ystep, self.scan_zstep]) + + # ToDo : This was left out + # self._raster_starting_x = start_x + # self._raster_starting_y = y1 + # self._raster_starting_z = z1 + # self._raster_step_y = ystep + # self._raster_step_z = zstep + # self._raster_current_row = 0 + # self._raster_num_rows = rows + + + + + def pre_scan(self): + # ToDo: Move roughly to start position + + # Reset the scan engine + yield from self.stubs.send_rpc_and_wait("abr", "reset") + + # Configure the global variables + d = {"scan_command" : self.scan_command, + "var_1" : self.omega.position, + "var_2" : self.scan_start_x, + "var_3" : self.scan_end_x, + "var_4" : self.scan_stepnum_x, + "var_5" : self.scan_stepnum_y, + "var_6" : self.scan_angle, + "var_7" : self.scan_exp_time, + "var_8" : HALF_PERIOD, + "var_9" : self._gmx_offset.get, + "var_10" : self.scan_gmy_step, + } + yield from self.stubs.send_rpc_and_wait("abr", "configure", d) + + # Call super + yield from self.stubs.pre_scan() + + +