diff --git a/pxiii_bec/devices/A3200.py b/pxiii_bec/devices/A3200.py index 9109243..d91d4c9 100644 --- a/pxiii_bec/devices/A3200.py +++ b/pxiii_bec/devices/A3200.py @@ -4,40 +4,27 @@ This module provides an object to control the Aerotech Abr rotational stage. -The following constants are defined. - -Measurement phases: - - Aerotech.ABR_DONE - Aerotech.ABR_READY - Aerotech.ABR_BUSY - -Axis mode - - Aerotech.DIRECT_MODE - Aerotech.MEASURING_MODE - -Shutter - - Aerotech.SHUTTER_OPEN - Aerotech.SHUTTER_CLOSE - Methods in the Abr class ======================== +Standard bluesky interface: + AerotechAbrStage.configure(d={...}) + AerotechAbrStage.kickoff() + AerotechAbrStage.stop() +Additional bluesky functionality: + + + + + Aerotech.is_homed() Aerotech.do_homing(wait=True) Aerotech.get_ready(ostart=None, orange=None, etime=None, wait=True) Aerotech.is_done() Aerotech.is_ready() Aerotech.is_busy() -Aerotech.stop() Aerotech.start_exposure() Aerotech.wait_status(status) -Aerotech.set_mode(, value) -Aerotech.get_mode() -Aerotech.set_direct_mode() -Aerotech.set_measuring_mode() Aerotech.move(angle, wait=False, speed=None) Aerotech.set_shutter(state) @@ -45,48 +32,38 @@ Attribute Access in Abr class ============================= The Abr class overwrites the getattr and setattr methods to provide a pythonic -way of controlling the Abr. +way of controlling the rootation stage of the Abr. The following properties are implemented: -velo - +velocity Sets the velocity of rotation: ``-ES-DF1:ROTX-SETV`` omega - Move the omega angle without any wait: ``-ES-DF1:ROTX-VAL`` -exposure_time - +exp_time Sets the PV for the measurement's exposure time: ``-ES-OSC:ETIME`` start_angle - Sets the PV for the measurement's starting angle: ``-ES-OSC:START-POS`` oscillation_angle - Sets the PV for the measurement's oscillation angle: ``-ES-OSC:RANGE`` shutter - Controls the shutter: ``-ES-PH1:SET`` and ``-ES-PH1:GET`` -mode - - Controls the axis mode: ``-ES-DF1:AXES-MODE`` - measurement_state + Returns the PV for the measurement state: ``-ES-OSC:DONE`` - Controls the PV for the measurement state: ``-ES-OSC:DONE`` - +axis_mode + Returns the axis mode: ``-ES-DF1:AXES-MODE`` Examples ======== - import Aerotech - abr = Aerotech.Abr() + abr = AerotechAbrStage(prefix="X06DA-ES-DF1", name="abr") # move omega to 270.0 degrees abr.omega = 270.0 @@ -95,7 +72,7 @@ Examples abr.move(180, wait=True) # move omega to 3000 degrees at 360 degrees/s and wait for movement to finish - abr.move(3000, speed=360, wait=True) + abr.move(3000, velocity=360, wait=True) # stop any movement abr.stop() # this function only returns after the STATUS is back to OK @@ -198,341 +175,13 @@ class AerotechAbrStage(Device): # A few PVs still needed from grid raster_scan_done = Component(EpicsSignal, "-ES-GRD:SCAN-DONE", kind=Kind.config) - raster_num_rows = Component(EpicsSignal, "-ES-GRD:SCAN-DONE", kind=Kind.config) - - - # def __init__(self): - # self.__pv_ostart = PV(f"{BEAMLINE}-ES-OSC:START-POS") - # self.__pv_orange = PV(f"{BEAMLINE}-ES-OSC:RANGE") - # self.__pv_etime = PV(f"{BEAMLINE}-ES-OSC:ETIME") - # self.__pv_get_ready = PV(f"{BEAMLINE}-ES-OSC:READY.PROC") - # self.__pv_start = PV(f"{BEAMLINE}-ES-OSC:START") - # self.__pv_phase = PV(f"{BEAMLINE}-ES-OSC:DONE") - - # pend_io(10) - - - - def measure_standard(self, start, wedge, etime, ready_rate=500.0): - """measure a standard wedge from `start` to `start` + `wedge` during `etime` seconds""" - self.command(CMD_MEASURE_STANDARD) - self._var_1.set(start).wait() - self._var_2.set(wedge).wait() - self._var_3.set(etime).wait() - self._var_4.set(ready_rate).wait() - self.start_command() - - def measure_scan_slits(self, delta_x, delta_y, velocity): - """measure a standard wedge from `start` to `start` + `wedge` during `etime` seconds""" - self.command(CMD_SLIT_SCAN) - self._var_1.set(delta_x).wait() - self._var_2.set(delta_y).wait() - self._var_3.set(velocity).wait() - self.start_command() - - def measure_raster_simple(self, etime, cell_width, cell_height, ncols, nrows): - """measures the specified grid assuming the goniometer is currently at the CENTER of TOP-LEFT CELL - - etime: a float in seconds - cell_width: a float in mm - cell_height: a float in mm - ncols: an int - nrows: an int - - """ - # self.reset() - self.command(CMD_RASTER_SCAN_SIMPLE) - self.raster.scan_done.set(0).wait() # make SCAN DONE go into busy state - self._var_1.set(etime).wait() - self._var_2.set(cell_width).wait() - self._var_3.set(cell_height).wait() - self._var_4.set(int(ncols)).wait() - self._var_5.set(int(nrows)).wait() - self._var_6.set(0).wait() - self._var_7.set(0).wait() - self._var_8.set(0).wait() - self.raster._raster_num_rows = nrows - self.start_command() - - def measure_sastt(self, etime, cell_width, cell_height, ncols, nrows, even_tweak=0.0, odd_tweak=0.0, version=1): - """measures the specified grid assuming the goniometer is currently at the CENTER of the required grid - - etime: a float in seconds - cell_width: a float in mm - cell_height: a float in mm - ncols: an int - nrows: an int - - odd_tweak, even_tweak: float, a distance to be added to the start of - the open shutter command. used on in the scan mode - version=3 - - - these values can be either positive/negative - - they should be small, i.e. just to tweak a trriggering event - by minor amounts - version: an int (1, 2, 3) - 1 = original snake scan, single PSO window - 2 = scan always from LEFT---RIGHT - 3 = snake scan alternating PSO window for even/odd rows - - """ - # self.reset() - if version == 1: - self.command(CMD_SCAN_SASTT) - elif version == 2: - self.command(CMD_SCAN_SASTT_V2) - elif version == 3: - self.command(CMD_SCAN_SASTT_V3) - else: - raise RuntimeError("non existing SAS-TT scan mode") - - self.raster.scan_done.set(0).wait() # make SCAN DONE go into busy state - self._var_1.set(etime).wait() - self._var_2.set(cell_width).wait() - self._var_3.set(cell_height).wait() - self._var_4.set(int(ncols)).wait() - self._var_5.set(int(nrows)).wait() - self._var_6.set(even_tweak).wait() - self._var_7.set(odd_tweak).wait() - self._var_8.set(0).wait() - self.raster._raster_num_rows = nrows - self.start_command() - - def measure_vertical_line(self, cellHeight, numCells, exposureTime): - """measure a vertical line using the GMY motor - - The line is nummCells * cellHeight long and the exposureTime is per cell. - - `cellHeight` in mm - `numCells` an integer - `exposureTime` in seconds / per cellHeight - """ - self.command(CMD_VERTICAL_LINE_SCAN) - self._var_1.set(cellHeight).wait() - self._var_2.set(numCells).wait() - self._var_3.set(exposureTime).wait() - self.start_command() - - def measure_screening(self, start, oscangle, etime, degrees, frames, delta=0.5): - self.command(CMD_SCREENING) - self._var_1.set(start).wait() - self._var_2.set(oscangle).wait() - self._var_3.set(etime).wait() - self._var_4.set(degrees).wait() - self._var_5.set(frames).wait() - self._var_6.set(delta).wait() - self.start_command() - - def measure_fast_omega(self, start, wedge, etime, rate=200.0): - self.command(CMD_SUPER_FAST_OMEGA) - self._var_1.set(start).wait() - self._var_2.set(wedge).wait() - self._var_3.set(etime).wait() - self._var_4.set(rate).wait() - self.start_command() - - def measure_still_wedge(self, start, wedge, etime, oscangle, sleep_after_shutter_close=0.010): - self.command(CMD_STILL_WEDGE) - self._var_1.set(start).wait() - self._var_2.set(wedge).wait() - self._var_3.set(etime).wait() - self._var_4.set(oscangle).wait() - self._var_5.set(sleep_after_shutter_close).wait() - self.start_command() - - def measure_stills(self, number_of_frames, exposure_time, idle=0.0): - """collect still images, i.e. no omega rotation - - number_of_frames => an integer - exposure_time => a float - idle => sleep in between each exposure - - IMPORTANT: use idle=0.0 to prevent shutter open/close - - """ - self.command(CMD_STILLS) - self._var_1.set(number_of_frames).wait() - self._var_2.set(exposure_time).wait() - self._var_3.set(idle).wait() - self.start_command() - - def measure_repeat_singles(self, frames, etime, oscangle, sleep=0.0, delta=0.5): - self.command(CMD_REPEAT_SINGLE_OSCILLATION) - self._var_1.set(frames).wait() - self._var_2.set(etime).wait() - self._var_3.set(oscangle).wait() - self._var_4.set(sleep).wait() - self._var_5.set(delta).wait() - self.start_command() - - def measure_single(self, start_angle, oscillation_angle, exposure_time, delta=0.0, settle=0.0): - self.command(CMD_SINGLE_OSCILLATION) - self._var_1.set(start_angle).wait() - self._var_2.set(oscillation_angle).wait() - self._var_3.set(exposure_time).wait() - self._var_4.set(delta).wait() - self._var_5.set(settle).wait() - self.start_command() - - def measure_raster(self, positions, columns, angle, etime): - """raster via generic experiment interface""" - self.reset() - if len(positions) == 1: - raise RuntimeWarning("Raster scan with one cell makes no sense") - self.command(CMD_RASTER_SCAN) - - rows = len(positions) / columns - x1, y1, z1 = tuple(positions[0]) # first cell on first row - x2, y2, z2 = tuple(positions[1]) # second cell on first row - x4, y4, z4 = tuple(positions[columns - 1]) # last cell on first row - - if rows > 1: - x3, y3, z3 = tuple(positions[columns]) # first cell on second row - ystep = y3 - y1 - zstep = z3 - z1 - gmy_step = math.sqrt(pow(y3 - y1, 2) + pow(z3 - z1, 2)) - else: - ystep = 0.010 - zstep = 0.010 - gmy_step = 0.010 - - half_cell = abs(x2 - x1) / 2.0 - start_x = x1 - half_cell - end_x = x4 + half_cell - - self._raster_starting_x = start_x - self._raster_starting_y = y1 - self._raster_starting_z = z1 - self._raster_step_y = ystep - self._raster_step_z = zstep - self._raster_current_row = 0 - self._raster_num_rows = rows - - self._var_1.set(self.position).wait() - self._var_2.set(start_x).wait() - self._var_3.set(end_x).wait() - self._var_4.set(columns).wait() - self._var_5.set(rows).wait() - self._var_6.set(angle).wait() - self._var_7.set(etime).wait() - self._var_8.set(HALF_PERIOD).wait() - self._var_9.set(self.gmx.offset.get()).wait() - self._var_10.set(gmy_step).wait() - - self.start_command() - - def measure_raster_still(self, positions, columns, etime, delay): - """raster via generic experiment interface - - positions: list of coordinates for each cell - columns: how many columns in grid - etime: exposure time / cell_width - delay: delay configured in delay generator - to trigger detector after aerotech; - must be greater than actual shutter delay - """ - self.reset() - if len(positions) == 1: - raise RuntimeWarning("Raster scan with one cell makes no sense") - self.command(CMD_RASTER_SCAN_STILL) - - rows = len(positions) / columns - x1, y1, z1 = tuple(positions[0]) # first cell on first row - x2, y2, z2 = tuple(positions[1]) # second cell on first row - x4, y4, z4 = tuple(positions[columns - 1]) # last cell on first row - - if rows > 1: - x3, y3, z3 = tuple(positions[columns]) # first cell on second row - ystep = y3 - y1 - zstep = z3 - z1 - gmy_step = math.sqrt(pow(y3 - y1, 2) + pow(z3 - z1, 2)) - else: - ystep = 0.010 - zstep = 0.010 - gmy_step = 0.010 - - half_cell = abs(x2 - x1) / 2.0 - start_x = x1 - half_cell - end_x = x4 + half_cell - - self._raster_starting_x = start_x - self._raster_starting_y = y1 - self._raster_starting_z = z1 - self._raster_step_y = ystep - self._raster_step_z = zstep - self._raster_current_row = 0 - self._raster_num_rows = rows - - self._var_1.set(self.position).wait() - self._var_2.set(start_x).wait() - self._var_3.set(end_x).wait() - self._var_4.set(columns).wait() - self._var_5.set(rows).wait() - self._var_6.set(delay).wait() - self._var_7.set(etime).wait() - self._var_8.set(HALF_PERIOD).wait() - self._var_9.set(self.gmx.offset.get()).wait() - self._var_10.set(gmy_step).wait() - - self.start_command() - - def measure_jungfrau(self, columns, rows, width, height, etime, sleep=0.100): - """for Jungfrau, grid from current position""" - self.reset() - self.command(CMD_JUNGFRAU) - self._var_1.set(columns).wait() - self._var_2.set(rows).wait() - self._var_3.set(height).wait() - self._var_4.set(width).wait() - self._var_5.set(etime).wait() - self._var_6.set(sleep).wait() - self._var_7.set(0.0).wait() - self._var_8.set(0.0).wait() - self._var_9.set(0.0).wait() - self._var_10.set(0.0).wait() - - self.start_command() - timeout = None if sleep <= 0 else 5.0 + (rows * sleep) + (columns * rows * etime) - - if timeout: - # Define wait until the busy flag goes down (excluding initial update) - timestamp_ = 0 - def isReady(*args, old_value, value, timestamp, **kwargs): - nonlocal timestamp_ - result = False if (timestamp_== 0) else bool(value==GRID_SCAN_DONE) - print(f"Old {old_value}\tNew: {value}\tResult: {result}") - timestamp_ = timestamp - return result - - # Subscribe and wait for update - status = SubscriptionStatus(self.raster.scan_done, isReady, timeout=timeout, settle_time=0.5) - status.wait() - - self.set_direct_mode() - - def measure_msox(self, start, wedge, etime, num_datasets=1, rest_time=0.0): - """for msox - start = start omega - wedge = total range to measure - etime = total time for wedge - num_datasets = how many times to repeat - rest_time = how many seconds to rest in between datasets - """ - self.reset() - self.command(CMD_MSOX) - self._var_1.set(start).wait() - self._var_2.set(wedge).wait() - self._var_3.set(etime).wait() - self._var_4.set(num_datasets).wait() - self._var_5.set(rest_time).wait() - self._var_6.set(0).wait() - self._var_7.set(0).wait() - self._var_8.set(0).wait() - self._var_9.set(0).wait() - self._var_10.set(0).wait() - - self.start_command() + raster_num_rows = Component(EpicsSignal, "-ES-GRD:ROW-DONE", kind=Kind.config) + def set_axis_mode(self, mode: str, settle_time=0.1) -> None: + if mode=="direct": + self.axisModeDirect.set(37, settle_time=settle_time).wait() + if mode=="measuring": + self.axisAxesMode.set(MEASURING_MODE, settle_time=settle_time).wait() def configure(self, d: dict) -> tuple: @@ -572,9 +221,9 @@ class AerotechAbrStage(Device): """ return self.raster.complete() - def reset(self): - self.command(CMD_NONE) - self.set_direct_mode() + def reset(self, settle_time=0.1): + self.scan_command.set(CMD_NONE, settle_time=settle_time).wait() + self.set_axis_mode("direct", settle_time=settle_time) def is_ioc_ok(self): return 0 == self.status.get() @@ -597,27 +246,27 @@ class AerotechAbrStage(Device): PARAMETERS `wait` true / false if the routine is to wait for the homing to finish. """ - self.omega.home.set(1).wait() + self.omega.home.set(1, settle_time=1).wait() poll(1.0) if not wait: return while not self.omega.is_homed(): - poll(0.2) + time.sleep(0.2) @property - def velo(self): + def velocity(self): return self.omega.velocity.get() - @velo.setter - def velo(self, value): + @velocity.setter + def velocity(self, value): self.omega.velocity.set(value).wait() @property - def etime(self): - return self.osc.etime.get() + def exp_time(self): + return self.osc.exp_time.get() - @etime.setter - def etime(self, value): + @exp_time.setter + def exp_time(self, value): self.osc.etime.set(value).wait() @property @@ -657,17 +306,9 @@ class AerotechAbrStage(Device): return state == self._shutter.get() @property - def mode(self): + def axis_mode(self): return self.axisAxesMode.get() - @mode.setter - def mode(self, value): - self.axisAxesMode.set(value).wait() - - - - - def get_ready(self, ostart=None, orange=None, etime=None, wait=True): self.wait_for_movements() if self.measurement_state == ABR_BUSY: @@ -683,9 +324,7 @@ class AerotechAbrStage(Device): if etime is not None: self.osc.etime.set(etime).wait() - self.osc.get_ready.set(1).wait() - - poll(0.1) + self.osc.get_ready.set(1, settle_time=0.1).wait() if wait: for n in range(10): @@ -707,7 +346,7 @@ class AerotechAbrStage(Device): moving = True while not timeisup and moving: - poll(0.1) + time.sleep(0.1) try: moving = self.is_moving() except: @@ -731,10 +370,9 @@ class AerotechAbrStage(Device): couple of seconds. """ - self.taskStop.put(1, wait=True) - poll(wait_after_reload) + self.taskStop.set(1, settle_time=wait_after_reload).wait() self.reset() - poll(0.1) + time.sleep(0.1) reload_programs = stop @@ -746,79 +384,16 @@ class AerotechAbrStage(Device): try: self.osc.wait_status(ABR_BUSY, timeout=1) except RuntimeWarning as e: - print(str(e), end=" ") - print(" --- trying start again.") - self.osc.taskStart.set(1).wait() + print("%s --- trying start again.", str(e)) + self.osc.kickoff() def move(self, position, wait=False, velocity=None, relative=None, direct=False, **kwargs) -> MoveStatus: - """Move omega to `angle` at `speed` deg/s and `wait` (or not). - - If a `speed` is specified, it will be used for the movement but the speed - setting will be reverted to its old value afterwards. - - If `wait` is true, the routine waits until the omega position is close - to within 0.01 degrees to the target or that a timeout occured. The - timeout is the time required for the movement plus 5 seconds. - - In case of a timeout a RuntimeWarning is raised. - - PARAMETERS: - `position` [degrees] a float specifying where to move to. - No limits. - `velo` [degrees/s] a float specifying speed to move at. - Limits: 0 - 360 - `wait` [boolean] wait or not for movement to finish. + """ Move the omega axis """ return self.omega.move(position, wait=wait, velocity=velocity, relative=relative, direct=direct, **kwargs) - def set_shutter(self, state): - if self.axisAxesMode.get(): - print("ABR is not in direct mode; cannot manipulate shutter") - return False - state = str(state).lower() - if state not in ["1", "0", "closed", "open"]: - print("unknown shutter state requested") - return None - elif state in ["1", "open"]: - state = 1 - elif state == ["0", "closed"]: - state = 0 - self._shutter.put(state, wait=True) - return state == self._shutter.get() if __name__ == "__main__": - aero = AerotechAbrStage(prefix="X06DA-ES-DF1", name="abr") - aero.wait_for_connection() - - # import argparse - # from time import sleep - - # parser = argparse.ArgumentParser(description="aerotech client") - # parser.add_argument("--stop", help="stop/reset/restart aerotech", action="store_true") - # parser.add_argument("--sanitize-speeds", help="reset speeds on axis to sane defaults", action="store_true") - # parser.add_argument("--measure-standard", help="measure a standard dataset", action="store") - # parser.add_argument("--omega", help="move omega to...", action="store") - # parser.add_argument("--gmx", help="move gmx to...", action="store") - # parser.add_argument("--gmy", help="move gmy to...", action="store") - # parser.add_argument("--gmz", help="move gmz to...", action="store") - # args = parser.parse_args() - - # abr = Abr() - # if args.stop: - # abr.stop() - - - # elif args.sanitize_speeds: - # abr.velo(AXIS_GMX, 50.0) - # abr.velo(AXIS_GMY, 10.0) - # abr.velo(AXIS_GMZ, 10.0) - # abr.velo(AXIS_OMEGA, 180.0) - - # elif args.gmx: - # abr.move_x(args.gmx) - - # elif args.measure_standard: - # start_angle, total_range, total_time = args.measure_standard.split(",") - # abr.measure_standard(start_angle, total_range, total_time) - + abr = AerotechAbrStage(prefix="X06DA-ES-DF1", name="abr") + abr.wait_for_connection() \ No newline at end of file diff --git a/pxiii_bec/devices/aeroA3200.py b/pxiii_bec/devices/aeroA3200.py index 11a2bcd..e319038 100644 --- a/pxiii_bec/devices/aeroA3200.py +++ b/pxiii_bec/devices/aeroA3200.py @@ -343,10 +343,8 @@ class A3200RasterScanner(Device): class A3200Oscillator(Device): - """Positioner wrapper for motors on the Aerotech A3200 controller. As the - IOC does not provide a motor record, this class simply wraps axes into - a standard Ophyd positioner for the BEC. It also has some additional - functionality for diagnostics.""" + """No clue what this does, seems to be redundant with the task based grid scanners... + """ ostart_pos = Component(EpicsSignal, "-ES-OSC:START-POS", put_complete=True, kind=Kind.config) orange = Component(EpicsSignal, "-ES-OSC:RANGE", put_complete=True, kind=Kind.config) @@ -367,6 +365,9 @@ class A3200Oscillator(Device): def is_busy(self): return ABR_BUSY == self.phase.get() + def kickoff(self): + self.osc.taskStart.set(1).wait() + def wait_status(self, target, timeout=60.0) -> SubscriptionStatus: """Wait for the Aertotech IOC to reach the desired `status`. diff --git a/pxiii_bec/scans/mx_measurements.py b/pxiii_bec/scans/mx_measurements.py index 5208f8f..9329772 100644 --- a/pxiii_bec/scans/mx_measurements.py +++ b/pxiii_bec/scans/mx_measurements.py @@ -1,8 +1,4 @@ - -+ 119 -− 0 import time - import numpy as np from bec_lib import bec_logger @@ -11,32 +7,44 @@ from bec_server.scan_server.scans import AsyncFlyScanBase, ScanArgType, ScanBase logger = bec_logger.logger -CMD_NONE = 0 -CMD_RASTER_SCAN_SIMPLE = 1 -CMD_MEASURE_STANDARD = 2 -CMD_VERTICAL_LINE_SCAN = 3 -CMD_SCREENING = 4 -CMD_SUPER_FAST_OMEGA = 5 -CMD_STILL_WEDGE = 6 -CMD_STILLS = 7 -CMD_REPEAT_SINGLE_OSCILLATION = 8 -CMD_SINGLE_OSCILLATION = 9 -CMD_OLD_FASHIONED = 10 -CMD_RASTER_SCAN = 11 -CMD_JET_ROTATION = 12 -CMD_X_HELICAL = 13 -CMD_X_RUNSEQ = 14 -CMD_JUNGFRAU = 15 -CMD_MSOX = 16 -CMD_SLIT_SCAN = 17 -CMD_RASTER_SCAN_STILL = 18 -CMD_SCAN_SASTT = 19 -CMD_SCAN_SASTT_V2 = 20 -CMD_SCAN_SASTT_V3 = 21 +FULL_PERIOD = 0 +HALF_PERIOD = 1 + +class SasttSnakeMode: + SNAKE_SINGLE = 0 + UNIDIRECTIONAL = 1 + SNAKE_DOUBLE_PSO = 2 + + +class AbrCmd: + NONE = 0 + RASTER_SCAN_SIMPLE = 1 + MEASURE_STANDARD = 2 + VERTICAL_LINE_SCAN = 3 + SCREENING = 4 + SUPER_FAST_OMEGA = 5 + STILL_WEDGE = 6 + STILLS = 7 + REPEAT_SINGLE_OSCILLATION = 8 + SINGLE_OSCILLATION = 9 + OLD_FASHIONED = 10 + RASTER_SCAN = 11 + JET_ROTATION = 12 + X_HELICAL = 13 + X_RUNSEQ = 14 + JUNGFRAU = 15 + MSOX = 16 + SLIT_SCAN = 17 + RASTER_SCAN_STILL = 18 + SCAN_SASTT = 19 + SCAN_SASTT_V2 = 20 + SCAN_SASTT_V3 = 21 class MxFlyscanBase(AsyncFlyScanBase): """ Base class for MX flyscans + + I'd be quite easy to improve standardization. """ scan_report_hint = "table" arg_input = {} @@ -69,93 +77,89 @@ class MxFlyscanBase(AsyncFlyScanBase): - - - class MeasureStandardWedge(MxFlyscanBase): - """ Measure a standard wedge scan + """ Standard wedge scan + + Measure a standard continous line scan from `start` to `start` + `range` + during `scan_time` on the Omega axis. + + The scan itself is executed by the scan service running on the Aerotech + controller. Ophyd just configures, launches it and waits for completion. + + Example + ------- + >>> scans.standard_wedge(start=42, range=10, scan_time=20) + + Parameters + ---------- + start : (float, float) + Scan start position of the axis. + range : (float, float) + Scan range of the axis. + scan_time : (float) + Total fly time for the movement [s]. + ready_rate : float, optional + No clue what is this... (default=500) """ scan_name = "standard_wedge" - required_kwargs = ["start", "range"] + required_kwargs = ["start", "range", "scan_time"] def __init__(self, *args, parameter: dict = None, **kwargs): - """ Standard measurement scan - - Measure a standard wedge from `start` to `start` + `wedge` during - `etime` on the Omega axis. - - The scan itself is executed by the scan service running on the Aerotech - controller. Ophyd just configures, launches it and waits for completion. - - Example - ------- - >>> scans.standard_wedge(start=42, range=10, scan_time=20) - - Parameters - ---------- - start : (float, float) - Scan start position of the axis. - range : (float, float) - Scan range of the axis. - scan_time : (float) - Total scan time for the movement [s]. - ready_rate : float, optional - No clue what is this... (default=500) - """ super().__init__(parameter=parameter, **kwargs) self.axis = ['omega'] - self.scan_command = CMD_MEASURE_STANDARD + self.scan_command = AbrCmd.MEASURE_STANDARD self.scan_start = self.caller_kwargs.get("start") self.scan_range = self.caller_kwargs.get("range") - self.scan_scan_time = float(self.caller_kwargs.get("scan_time", 5)) + self.scan_scan_time = float(self.caller_kwargs.get("scan_time")) self.scan_ready_rate = float(self.caller_kwargs.get("ready_rate", 500)) def pre_scan(self): # ToDo: Move roughly to start position - # Set the global variables - yield from self.stubs.command("abr", "scan_command.set", self.scan_command) - yield from self.stubs.command("abr", "_var_1.set", self.scan_start) - yield from self.stubs.command("abr", "_var_2.set", self.scan_range) - yield from self.stubs.command("abr", "_var_3.set", self.scan_scan_time) - yield from self.stubs.command("abr", "_var_4.set", self.scan_ready_rate) + # Configure the global variables + d = {"scan_command" : self.scan_command, + "var_1" : self.scan_start, + "var_2" : self.scan_range, + "var_3" : self.scan_scan_time, + "var_4" : self.scan_ready_rate, + } + yield from self.stubs.send_rpc_and_wait("abr", "configure", d) # Call super - yield from self.stubs.pre_scan() + yield from super().pre_scan() + class MeasureScanSlits(MxFlyscanBase): - """ Measure a slit scan + """ Slit scan + + Measure a standard slit scan. + + The scan itself is executed by the scan service running on the Aerotech + controller. Ophyd just configures, launches it and waits for completion. + + Example + ------- + >>> scans.slit_scan(start=42, range=10, scan_time=20) + + Parameters + ---------- + delta_x : (float, float) + Scan range with slit in x. + delta_y : (float, float) + Scan range with slit in y. + velocity : (float) + Scan velocity [mm/s]. """ scan_name = "slit_scan" - required_kwargs = ["start", "range"] + required_kwargs = ["delta_x", "delta_y", "velocity"] def __init__(self, *args, parameter: dict = None, **kwargs): - """ Slit scan - - Measure a standard slit scan. - - The scan itself is executed by the scan service running on the Aerotech - controller. Ophyd just configures, launches it and waits for completion. - - Example - ------- - >>> scans.slit_scan(start=42, range=10, scan_time=20) - - Parameters - ---------- - delta_x : (float, float) - Scan range with slit in x. - delta_y : (float, float) - Scan range with slit in y. - velocity : (float) - Scan velocity [mm/s]]. - """ super().__init__(parameter=parameter, **kwargs) self.axis = [] - self.scan_command = CMD_SLIT_SCAN + self.scan_command = AbrCmd.SLIT_SCAN self.scan_delta_x = self.caller_kwargs.get("delta_x") self.scan_delta_y = self.caller_kwargs.get("delta_y") self.scan_velocity = self.caller_kwargs.get("velocity") @@ -163,52 +167,53 @@ class MeasureScanSlits(MxFlyscanBase): def pre_scan(self): # ToDo: Move roughly to start position - # Set the global variables - yield from self.stubs.command("abr", "scan_command.set", self.scan_command) - yield from self.stubs.command("abr", "_var_1.set", self.scan_delta_x) - yield from self.stubs.command("abr", "_var_2.set", self.scan_delta_y) - yield from self.stubs.command("abr", "_var_3.set", self.scan_velocity) + # Configure the global variables + d = {"scan_command" : self.scan_command, + "var_1" : self.scan_delta_x, + "var_2" : self.scan_delta_y, + "var_3" : self.scan_velocity, + } + yield from self.stubs.send_rpc_and_wait("abr", "configure", d) # Call super - yield from self.stubs.pre_scan() + yield from super().pre_scan() + class MeasureRasterSimple(MxFlyscanBase): - """ Measure a grid scan + """ Simple raster scan + + Measure a simplified raster scan, assuming the goniometer is currently + at the CENTER of TOP-LEFT CELL. + + The scan itself is executed by the scan service running on the Aerotech + controller. Ophyd just configures, launches it and waits for completion. + + Example + ------- + >>> scans.raster_simple(exp_time=0.5, cell_width=0.2, cell_height=0.2, ncols=10, nrows=10) + + Parameters + ---------- + exp_time : (float) + Exposure time per point [s]. + cell_width : (float) + Scan step size [mm]. + cell_height : (float) + Scan step size [mm]. + ncols : (int) + Number of scan steps. + nrows : (int) + Number of scan steps. """ scan_name = "raster_simple" required_kwargs = ["exp_time", "cell_width", "cell_height", "ncols", "nrows"] def __init__(self, *args, parameter: dict = None, **kwargs): - """ Simnple raster scan - - Measure a simplified raster scan, assuming the goniometer is currently - at the CENTER of TOP-LEFT CELL. - - The scan itself is executed by the scan service running on the Aerotech - controller. Ophyd just configures, launches it and waits for completion. - - Example - ------- - >>> scans.raster_simple(exp_time=0.5, cell_width=0.2, cell_height=0.2, ncols=10, nrows=10) - - Parameters - ---------- - exp_time : (float) - Exposure time per point [s]. - cell_width : (float) - Scan step size [mm]. - cell_height : (float) - Scan step size [mm]. - ncols : (int) - Number of scan steps. - nrows : (int) - Number of scan steps. - """ super().__init__(parameter=parameter, **kwargs) self.axis = [] - self.scan_command = CMD_RASTER_SCAN_SIMPLE + self.scan_command = AbrCmd.RASTER_SCAN_SIMPLE self.scan_exp_time = self.caller_kwargs.get("exp_time") self.scan_stepsize_x = self.caller_kwargs.get("cell_width") self.scan_stepsize_y = self.caller_kwargs.get("cell_height") @@ -218,36 +223,35 @@ class MeasureRasterSimple(MxFlyscanBase): def pre_scan(self): # ToDo: Move roughly to start position - # Set the global variables + # Reset done flag yield from self.stubs.command("abr", "raster_scan_done.set", 0) - yield from self.stubs.command("abr", "scan_command.set", self.scan_command) - yield from self.stubs.command("abr", "_var_1.set", self.scan_exp_time) - yield from self.stubs.command("abr", "_var_2.set", self.scan_stepsize_x) - yield from self.stubs.command("abr", "_var_3.set", self.scan_stepsize_y) - yield from self.stubs.command("abr", "_var_4.set", self.scan_stepnum_x) - yield from self.stubs.command("abr", "_var_5.set", self.scan_stepnum_y) - yield from self.stubs.command("abr", "_var_6.set", 0) - yield from self.stubs.command("abr", "_var_7.set", 0) - yield from self.stubs.command("abr", "_var_8.set", 0) + # Configure the global variables + d = {"scan_command" : self.scan_command, + "var_1" : self.scan_exp_time, + "var_2" : self.scan_stepsize_x, + "var_3" : self.scan_stepsize_y, + "var_4" : self.scan_stepnum_x, + "var_5" : self.scan_stepnum_y, + "var_6" : 0, + "var_7" : 0, + "var_8" : 0, + } + yield from self.stubs.send_rpc_and_wait("abr", "configure", d) # ToDo: This line was left out #self.raster._raster_num_rows = nrows # Call super - yield from self.stubs.pre_scan() + yield from super().pre_scan() + class MeasureSASTT(MxFlyscanBase): - """ Measure a small angle scanning tensor tomography scan - """ - scan_name = "measure_sastt" - required_kwargs = ["exp_time", "cell_width", "cell_height", "ncols", "nrows"] + """ Small angle scanning tensor tomography scan - def __init__(self, *args, parameter: dict = None, **kwargs): - """ Standard measurement scan - - Measure a simplified grid scan, assuming the goniometer is currently - at the CENTER of the required grid. + Measure a Small Angle Scanning Tensor Tomography scan on the specified + grid assuming the goniometer is currently at the CENTER of the required + grid. The scan itself is executed by the scan service running on the Aerotech controller. Ophyd just configures, launches it and waits for completion. @@ -280,16 +284,20 @@ class MeasureSASTT(MxFlyscanBase): 2 = scan always from LEFT---RIGHT 3 = snake scan alternating PSO window for even/odd rows """ + scan_name = "measure_sastt" + required_kwargs = ["exp_time", "cell_width", "cell_height", "ncols", "nrows"] + + def __init__(self, *args, parameter: dict = None, **kwargs): super().__init__(parameter=parameter, **kwargs) self.axis = [] self.scan_version = int(self.caller_kwargs.get("version", 1)) if self.scan_version==1: - self.scan_command = CMD_SCAN_SASTT + self.scan_command = AbrCmd.SCAN_SASTT if self.scan_version==2: - self.scan_command = CMD_SCAN_SASTT_V2 + self.scan_command = AbrCmd.SCAN_SASTT_V2 if self.scan_version==3: - self.scan_command = CMD_SCAN_SASTT_V3 + self.scan_command = AbrCmd.SCAN_SASTT_V3 else: raise RuntimeError(f"Non existing SAS-TT scan mode: {self.scan_version}") @@ -304,17 +312,20 @@ class MeasureSASTT(MxFlyscanBase): def pre_scan(self): # ToDo: Move roughly to start position - # Set the global variables + # Reset done flag yield from self.stubs.command("abr", "raster_scan_done.set", 0) - yield from self.stubs.command("abr", "scan_command.set", self.scan_command) - yield from self.stubs.command("abr", "_var_1.set", self.scan_exp_time) - yield from self.stubs.command("abr", "_var_2.set", self.scan_stepsize_x) - yield from self.stubs.command("abr", "_var_3.set", self.scan_stepsize_y) - yield from self.stubs.command("abr", "_var_4.set", self.scan_stepnum_x) - yield from self.stubs.command("abr", "_var_5.set", self.scan_stepnum_y) - yield from self.stubs.command("abr", "_var_6.set", self.scan_even_tweak) - yield from self.stubs.command("abr", "_var_7.set", self.scan_odd_tweak) - yield from self.stubs.command("abr", "_var_8.set", 0) + # Configure the global variables + d = {"scan_command" : self.scan_command, + "var_1" : self.scan_exp_time, + "var_2" : self.scan_stepsize_x, + "var_3" : self.scan_stepsize_y, + "var_4" : self.scan_stepnum_x, + "var_5" : self.scan_stepnum_y, + "var_6" : self.scan_even_tweak, + "var_7" : self.scan_odd_tweak, + "var_8" : 0, + } + yield from self.stubs.send_rpc_and_wait("abr", "configure", d) # ToDo: This line was left out # self.raster._raster_num_rows = nrows @@ -323,10 +334,11 @@ class MeasureSASTT(MxFlyscanBase): yield from self.stubs.pre_scan() -class MeasureVerticalLine(MxFlyscanBase): - """ Measure a vertical line using the GMY motor - Simple fly scan that performs a single scan vertically. +class MeasureVerticalLine(MxFlyscanBase): + """ Vertical line scan using the GMY motor + + Simple fly scan that records a single vertical line. The line is nummCells * cellHeight long and the exposureTime is per cell. The scan itself is executed by the scan service running on the Aerotech @@ -334,45 +346,47 @@ class MeasureVerticalLine(MxFlyscanBase): Example ------- - >>> scans.measure_vertical_line(start=42, range=10, scan_time=20) + >>> scans.measure_vline(cell_height=12, cell_num=10, cell_time=20) Parameters ---------- cell_height : (float) Step size [mm]. - nrows : (int) + cell_num : (int) Scan range of the axis. - exp_time : (float) + cell_time : (float) Exposure time per cell [s] """ - scan_name = "measure_vertical_line" - required_kwargs = ["exp_time", "cell_height", "nrows"] + scan_name = "measure_vline" + required_kwargs = ["cell_time", "cell_height", "cell_num"] def __init__(self, *args, parameter: dict = None, **kwargs): - super().__init__(parameter=parameter, **kwargs) self.axis = ['gmy'] - self.scan_command = CMD_VERTICAL_LINE_SCAN - self.scan_exp_time = self.caller_kwargs.get("exp_time") + self.scan_command = AbrCmd.VERTICAL_LINE_SCAN + self.scan_cell_time = self.caller_kwargs.get("cell_time") self.scan_stepsize_y = self.caller_kwargs.get("cell_height") - self.scan_stepnum_y = int(self.caller_kwargs.get("nrows")) + self.scan_stepnum_y = int(self.caller_kwargs.get("cell_num")) def pre_scan(self): # ToDo: Move roughly to start position - # Set the global variables - yield from self.stubs.command("abr", "scan_command.set", self.scan_command) - yield from self.stubs.command("abr", "_var_1.set", self.scan_stepsize_y) - yield from self.stubs.command("abr", "_var_2.set", self.scan_stepnum_y) - yield from self.stubs.command("abr", "_var_3.set", self.scan_exp_time) + # Configure the global variables + d = {"scan_command" : self.scan_command, + "var_1" : self.scan_stepsize_y, + "var_2" : self.scan_stepnum_y, + "var_3" : self.scan_cell_time, + } + yield from self.stubs.send_rpc_and_wait("abr", "configure", d) # Call super - yield from self.stubs.pre_scan() + yield from super().pre_scan() + class MeasureScreening(MxFlyscanBase): - """ Measure a sample screening program + """ Sample screening scan The scan itself is executed by the scan service running on the Aerotech controller. Ophyd just configures, launches it and waits for completion. @@ -383,27 +397,25 @@ class MeasureScreening(MxFlyscanBase): Parameters ---------- - start : (float) + start : float Scan start position of the axis. - range : (float) + range : float Scan range of the axis. - scan_time : (float) + scan_time : float Total scan time for the movement [s]. degrees : (???) frames : (???) delta : (???) (default: 0.5). - """ scan_name = "measure_screening" required_kwargs = ["start", "range", "scan_time", "degrees", "frames"] def __init__(self, *args, parameter: dict = None, **kwargs): - super().__init__(parameter=parameter, **kwargs) self.axis = ['gmy'] - self.scan_command = CMD_SCREENING + self.scan_command = AbrCmd.SCREENING self.scan_start = self.caller_kwargs.get("start") self.scan_range = self.caller_kwargs.get("range") self.scan_scan_time = self.caller_kwargs.get("scan_time") @@ -414,25 +426,24 @@ class MeasureScreening(MxFlyscanBase): def pre_scan(self): # ToDo: Move roughly to start position - # Set the global variables - yield from self.stubs.command("abr", "scan_command.set", self.scan_command) - yield from self.stubs.command("abr", "_var_1.set", self.scan_start) - yield from self.stubs.command("abr", "_var_2.set", self.scan_range) - yield from self.stubs.command("abr", "_var_3.set", self.scan_scan_time) - yield from self.stubs.command("abr", "_var_4.set", self.scan_degrees) - yield from self.stubs.command("abr", "_var_5.set", self.scan_frames) - yield from self.stubs.command("abr", "_var_6.set", self.scan_delta) - + # Configure the global variables + d = {"scan_command" : self.scan_command, + "var_1" : self.scan_start, + "var_2" : self.scan_range, + "var_3" : self.scan_scan_time, + "var_4" : self.scan_degrees, + "var_5" : self.scan_frames, + "var_6" : self.scan_delta, + } + yield from self.stubs.send_rpc_and_wait("abr", "configure", d) # Call super - yield from self.stubs.pre_scan() - - + yield from super().pre_scan() class MeasureFastOmega(MxFlyscanBase): - """ Measure a fast omega scan + """ Fast omega scan MEasures a fast rotational scan with the rotation stage (omega). @@ -445,24 +456,23 @@ class MeasureFastOmega(MxFlyscanBase): Parameters ---------- - start : (float) + start : float Scan start position of the axis. - range : (float) + range : float Scan range of the axis. - scan_time : (float) + scan_time : float Total scan time for the movement [s]. rate : (???) (default: 200). """ scan_name = "measure_fast_omega" - required_kwargs = ["start", "range", "exp_time"] + required_kwargs = ["start", "range", "scan_time"] def __init__(self, *args, parameter: dict = None, **kwargs): - super().__init__(parameter=parameter, **kwargs) self.axis = ['omega'] - self.scan_command = CMD_SUPER_FAST_OMEGA + self.scan_command = AbrCmd.SUPER_FAST_OMEGA self.scan_start = self.caller_kwargs.get("start") self.scan_range = self.caller_kwargs.get("range") self.scan_scan_time = self.caller_kwargs.get("scan_time") @@ -471,21 +481,22 @@ class MeasureFastOmega(MxFlyscanBase): def pre_scan(self): # ToDo: Move roughly to start position - # Set the global variables - yield from self.stubs.command("abr", "scan_command.set", self.scan_command) - yield from self.stubs.command("abr", "_var_1.set", self.scan_start) - yield from self.stubs.command("abr", "_var_2.set", self.scan_range) - yield from self.stubs.command("abr", "_var_3.set", self.scan_scan_time) - yield from self.stubs.command("abr", "_var_4.set", self.scan_rate) + # Configure the global variables + d = {"scan_command" : self.scan_command, + "var_1" : self.scan_start, + "var_2" : self.scan_range, + "var_3" : self.scan_scan_time, + "var_4" : self.scan_rate, + } + yield from self.stubs.send_rpc_and_wait("abr", "configure", d) # Call super yield from self.stubs.pre_scan() - class MeasureStillWedge(MxFlyscanBase): - """ Measure a still wedge scan + """ Still wedge scan The scan itself is executed by the scan service running on the Aerotech controller. Ophyd just configures, launches it and waits for completion. @@ -510,11 +521,10 @@ class MeasureStillWedge(MxFlyscanBase): required_kwargs = ["start", "range", "exp_time", "oscrange"] def __init__(self, *args, parameter: dict = None, **kwargs): - super().__init__(parameter=parameter, **kwargs) self.axis = ['omega'] - self.scan_command = CMD_STILL_WEDGE + self.scan_command = AbrCmd.STILL_WEDGE self.scan_start = self.caller_kwargs.get("start") self.scan_range = self.caller_kwargs.get("range") self.scan_scan_time = self.caller_kwargs.get("scan_time") @@ -524,24 +534,23 @@ class MeasureStillWedge(MxFlyscanBase): def pre_scan(self): # ToDo: Move roughly to start position - # Set the global variables - yield from self.stubs.command("abr", "scan_command.set", self.scan_command) - yield from self.stubs.command("abr", "_var_1.set", self.scan_start) - yield from self.stubs.command("abr", "_var_2.set", self.scan_range) - yield from self.stubs.command("abr", "_var_3.set", self.scan_scan_time) - yield from self.stubs.command("abr", "_var_4.set", self.scan_oscrange) - yield from self.stubs.command("abr", "_var_5.set", self.scan_shutter_sleep) + # Configure the global variables + d = {"scan_command" : self.scan_command, + "var_1" : self.scan_start, + "var_2" : self.scan_range, + "var_3" : self.scan_scan_time, + "var_4" : self.scan_oscrange, + "var_5" : self.scan_shutter_sleep, + } + yield from self.stubs.send_rpc_and_wait("abr", "configure", d) # Call super yield from self.stubs.pre_scan() - - - class MeasureStills(MxFlyscanBase): - """ Measure a still image sequence + """ Still image sequence scan Measures a series of still images without any motor movement. IMPORTANT: use idle_time=0.0 to prevent shutter open/close @@ -551,26 +560,25 @@ class MeasureStills(MxFlyscanBase): Example ------- - >>> scans.measure_stils(nframes=100, exp_time=0.1, idle_time=30) + >>> scans.measure_stills(nframes=100, exp_time=0.1, idle_time=30) Parameters ---------- - nframes : (int) + nframes : int Total number of frames to be recorded. - exposure_time : (float) + exposure_time : float Exposure time of each frame [s]. - idle_time : (float) + idle_time : float Sleep time between the frames [s]. """ - scan_name = "measure_stils" - required_kwargs = ["exp_time", "nframes"] + scan_name = "measure_stills" + required_kwargs = ["exp_time", "nframes", "idle_time"] def __init__(self, *args, parameter: dict = None, **kwargs): - super().__init__(parameter=parameter, **kwargs) self.axis = [] - self.scan_command = CMD_STILLS + self.scan_command = AbrCmd.STILLS self.scan_exp_time = self.caller_kwargs.get("exp_time") self.scan_num_frames = int(self.caller_kwargs.get("nframes")) self.scan_idle_time = self.caller_kwargs.get("idle_time") @@ -578,22 +586,21 @@ class MeasureStills(MxFlyscanBase): def pre_scan(self): # ToDo: Move roughly to start position - # Set the global variables - yield from self.stubs.command("abr", "scan_command.set", self.scan_command) - yield from self.stubs.command("abr", "_var_1.set", self.scan_num_frames) - yield from self.stubs.command("abr", "_var_2.set", self.scan_exp_time) - yield from self.stubs.command("abr", "_var_3.set", self.scan_idle_time) + # Configure the global variables + d = {"scan_command" : self.scan_command, + "var_1" : self.scan_num_frames, + "var_2" : self.scan_exp_time, + "var_3" : self.scan_idle_time, + } + yield from self.stubs.send_rpc_and_wait("abr", "configure", d) # Call super yield from self.stubs.pre_scan() - - - class MeasureSingleOscillation(MxFlyscanBase): - """ Measure a single oscillation + """ Single oscillation scan The scan itself is executed by the scan service running on the Aerotech controller. Ophyd just configures, launches it and waits for completion. @@ -604,11 +611,11 @@ class MeasureSingleOscillation(MxFlyscanBase): Parameters ---------- - start : (float) + start : float Scan start position of the axis. - range : (float) + range : float Oscillation range of the axis. - scan_time : (float) + scan_time : float Total scan time for the movement [s]. delta : (???) (default: 0). @@ -619,11 +626,10 @@ class MeasureSingleOscillation(MxFlyscanBase): required_kwargs = ["start", "range", "scan_time"] def __init__(self, *args, parameter: dict = None, **kwargs): - super().__init__(parameter=parameter, **kwargs) self.axis = ['omega'] - self.scan_command = CMD_SINGLE_OSCILLATION + self.scan_command = AbrCmd.SINGLE_OSCILLATION self.scan_start = self.caller_kwargs.get("start") self.scan_range = self.caller_kwargs.get("range") self.scan_scan_time = self.caller_kwargs.get("scan_time") @@ -633,23 +639,23 @@ class MeasureSingleOscillation(MxFlyscanBase): def pre_scan(self): # ToDo: Move roughly to start position - # Set the global variables - yield from self.stubs.command("abr", "scan_command.set", self.scan_command) - yield from self.stubs.command("abr", "_var_1.set", self.scan_start) - yield from self.stubs.command("abr", "_var_2.set", self.scan_range) - yield from self.stubs.command("abr", "_var_3.set", self.scan_scan_time) - yield from self.stubs.command("abr", "_var_4.set", self.scan_delta) - yield from self.stubs.command("abr", "_var_5.set", self.scan_settle) + # Configure the global variables + d = {"scan_command" : self.scan_command, + "var_1" : self.scan_start, + "var_2" : self.scan_range, + "var_3" : self.scan_scan_time, + "var_4" : self.scan_delta, + "var_5" : self.scan_settle, + } + yield from self.stubs.send_rpc_and_wait("abr", "configure", d) # Call super yield from self.stubs.pre_scan() - - class MeasureRepeatedOscillation(MxFlyscanBase): - """ Measure oscillation repeatedly + """ Repeated oscillation scan The scan itself is executed by the scan service running on the Aerotech controller. Ophyd just configures, launches it and waits for completion. @@ -660,27 +666,25 @@ class MeasureRepeatedOscillation(MxFlyscanBase): Parameters ---------- - nframes : (int) + nframes : int ??? - scan_time : (float) + scan_time : float Total scan time for the movement [s]. - range : (float) + range : float Oscillation range of the axis. settle : (???) (default: 0). delta : (???) (default: 0.5). - """ scan_name = "measure_multi_osc" - required_kwargs = ["start", "range", "scan_time"] + required_kwargs = ["nframes", "range", "scan_time"] def __init__(self, *args, parameter: dict = None, **kwargs): - super().__init__(parameter=parameter, **kwargs) self.axis = ['omega'] - self.scan_command = CMD_REPEAT_SINGLE_OSCILLATION + self.scan_command = AbrCmd.REPEAT_SINGLE_OSCILLATION self.scan_num_frames = self.caller_kwargs.get("nframes") self.scan_range = self.caller_kwargs.get("range") self.scan_scan_time = self.caller_kwargs.get("scan_time") @@ -705,8 +709,6 @@ class MeasureRepeatedOscillation(MxFlyscanBase): - - class MeasureMSOX(MxFlyscanBase): """ Standard MSOX scan @@ -719,11 +721,11 @@ class MeasureMSOX(MxFlyscanBase): Parameters ---------- - start : (float, float) + start : float Scan start position of the axis. - range : (float, float) + range : float Scan range of the axis. - exp_time : (float) + exp_time : float Exposure time for each point [s]. num_datasets : int ??? @@ -734,18 +736,16 @@ class MeasureMSOX(MxFlyscanBase): required_kwargs = ["start", "range", "exp_time"] def __init__(self, *args, parameter: dict = None, **kwargs): - super().__init__(parameter=parameter, **kwargs) self.axis = ['omega'] - self.scan_command = CMD_MSOX + self.scan_command = AbrCmd.MSOX self.scan_start = self.caller_kwargs.get("start") self.scan_range = self.caller_kwargs.get("range") self.scan_exp_time = self.caller_kwargs.get("exp_time") self.scan_num_datasets = int(self.caller_kwargs.get("num_datasets", 1)) self.scan_rest_time = self.caller_kwargs.get("rest_time", 0) - def pre_scan(self): # ToDo: Move roughly to start position @@ -759,6 +759,11 @@ class MeasureMSOX(MxFlyscanBase): "var_3" : self.scan_exp_time, "var_4" : self.scan_num_datasets, "var_5" : self.scan_rest_time, + "var_6" : 0, + "var_7" : 0, + "var_8" : 0, + "var_9" : 0, + "var_10" : 0, } yield from self.stubs.send_rpc_and_wait("abr", "configure", d) @@ -767,10 +772,6 @@ class MeasureMSOX(MxFlyscanBase): - - - - class MeasureGrid(MxFlyscanBase): """ General grid scan @@ -782,7 +783,7 @@ class MeasureGrid(MxFlyscanBase): Example ------- - >>> scans.measure_msox(start=42, range=10, exp_time=0.1) + >>> scans.measure_raster(start=42, range=10, exp_time=0.1) Parameters ---------- @@ -799,11 +800,10 @@ class MeasureGrid(MxFlyscanBase): required_kwargs = ["positions", "ncols", "angle", "exp_time"] def __init__(self, *args, parameter: dict = None, **kwargs): - super().__init__(parameter=parameter, **kwargs) self.axis = ['omega'] - self.scan_command = CMD_RASTER_SCAN + self.scan_command = AbrCmd.RASTER_SCAN self.scan_positions = self.caller_kwargs.get("positions") self.scan_stepnum_x = int(self.caller_kwargs.get("ncols")) self.scan_stepnum_y = self.scan_positions / self.scan_stepnum_x @@ -814,13 +814,14 @@ class MeasureGrid(MxFlyscanBase): raise RuntimeWarning("Raster scan with one cell makes no sense") # Good practice: new members only in __init__ + self.scan_start_x = None + self.scan_end_x = None + self.scan_xstep = 0.010 self.scan_ystep = 0.010 self.scan_zstep = 0.010 self.scan_gmy_step = 0.010 - def prepare_positions(self): - # Calculate step sizes pos_start = self.scan_positions[0] # first cell on first row pos_col_2nd = self.scan_positions[1] # second cell on first row @@ -837,6 +838,10 @@ class MeasureGrid(MxFlyscanBase): self.scan_zstep = pos_row_2nd[2] - pos_start[2] self.scan_gmy_step = np.linalg.norm([self.scan_ystep, self.scan_zstep]) + # ToDo: Check with Zac what are theese parameters + self.scan_omega_position = yield from self.stubs.send_rpc_and_wait("abs", "omega.readback.get") + self.scan_gmx_offset = yield from self.stubs.send_rpc_and_wait("abs", "gmx.readback.get") + # ToDo : This was left out # self._raster_starting_x = start_x # self._raster_starting_y = y1 @@ -846,9 +851,6 @@ class MeasureGrid(MxFlyscanBase): # self._raster_current_row = 0 # self._raster_num_rows = rows - - - def pre_scan(self): # ToDo: Move roughly to start position @@ -857,7 +859,7 @@ class MeasureGrid(MxFlyscanBase): # Configure the global variables d = {"scan_command" : self.scan_command, - "var_1" : self.omega.position, + "var_1" : self.scan_omega_position, "var_2" : self.scan_start_x, "var_3" : self.scan_end_x, "var_4" : self.scan_stepnum_x, @@ -865,7 +867,7 @@ class MeasureGrid(MxFlyscanBase): "var_6" : self.scan_angle, "var_7" : self.scan_exp_time, "var_8" : HALF_PERIOD, - "var_9" : self._gmx_offset.get, + "var_9" : self.scan_gmx_offset, "var_10" : self.scan_gmy_step, } yield from self.stubs.send_rpc_and_wait("abr", "configure", d) @@ -875,3 +877,191 @@ class MeasureGrid(MxFlyscanBase): + + +class MeasureGridStill(MxFlyscanBase): + """ Still grid scan + + Note: This was probably never used in its current form, because the + code had an undefined variable 'self.position' + + The scan itself is executed by the scan service running on the Aerotech + controller. Ophyd just configures, launches it and waits for completion. + + Example + ------- + >>> scans.measure_raster(positions=[[11, 2, 3.4], [11, 2, 3.5], ...], columns=20, exp_time=0.1, delay=0.05) + + Parameters + ---------- + positions : 2D-array + Scan position of each axis, i.e. (N, 3) shaped array. + columns : int + Nmber of columns. + angle: + ??? + exp_time : float + Exposure time for each point [s]. + delay: + ??? + """ + scan_name = "measure_raster_still" + required_kwargs = ["positions", "columns", "exp_time", "delay"] + + def __init__(self, *args, parameter: dict = None, **kwargs): + super().__init__(parameter=parameter, **kwargs) + self.axis = ['omega'] + + self.scan_command = AbrCmd.RASTER_SCAN_STILL + self.scan_positions = self.caller_kwargs.get("positions") + self.scan_stepnum_x = int(self.caller_kwargs.get("columns")) + self.scan_stepnum_y = self.scan_positions / self.scan_stepnum_x + self.scan_exp_time = self.caller_kwargs.get("exp_time") + self.scan_delay = self.caller_kwargs.get("delay") + + if len(self.scan_positions)==1: + raise RuntimeWarning("Raster scan with one cell makes no sense") + + # Good practice: new members only in __init__ + self.scan_start_x = None + self.scan_end_x = None + self.scan_xstep = 0.010 + self.scan_ystep = 0.010 + self.scan_zstep = 0.010 + self.scan_gmy_step = 0.010 + + def prepare_positions(self): + # Calculate step sizes + pos_start = self.scan_positions[0] # first cell on first row + pos_col_2nd = self.scan_positions[1] # second cell on first row + pos_col_end = self.scan_positions[self.scan_stepnum_x-1] # last cell on first row + + self.scan_xstep = pos_col_2nd[0] - pos_start[0] + half_cell = abs(self.scan_xstep) / 2.0 + self.scan_start_x = pos_start[0] - half_cell + self.scan_end_x = pos_col_end[0] + half_cell + + if self.scan_stepnum_y > 1: + pos_row_2nd = self.scan_positions[self.scan_stepnum_x] # first cell on second row + self.scan_ystep = pos_row_2nd[1] - pos_start[1] + self.scan_zstep = pos_row_2nd[2] - pos_start[2] + self.scan_gmy_step = np.linalg.norm([self.scan_ystep, self.scan_zstep]) + + # ToDo: Check with Zac what are theese parameters + self.scan_omega_position = yield from self.stubs.send_rpc_and_wait("abs", "omega.readback.get") + self.scan_gmx_offset = yield from self.stubs.send_rpc_and_wait("abs", "gmx.readback.get") + + # ToDo : This was left out + # self._raster_starting_x = start_x + # self._raster_starting_y = y1 + # self._raster_starting_z = z1 + # self._raster_step_y = ystep + # self._raster_step_z = zstep + # self._raster_current_row = 0 + # self._raster_num_rows = rows + + def pre_scan(self): + # ToDo: Move roughly to start position + + # Reset the scan engine + yield from self.stubs.send_rpc_and_wait("abr", "reset") + + # Configure the global variables + d = {"scan_command" : self.scan_command, + "var_1" : self.scan_omega_position, + "var_2" : self.scan_start_x, + "var_3" : self.scan_end_x, + "var_4" : self.scan_stepnum_x, + "var_5" : self.scan_stepnum_y, + "var_6" : self.scan_delay, + "var_7" : self.scan_exp_time, + "var_8" : HALF_PERIOD, + "var_9" : self.scan_gmx_offset, + "var_10" : self.scan_gmy_step, + } + yield from self.stubs.send_rpc_and_wait("abr", "configure", d) + + # Call super + yield from self.stubs.pre_scan() + + + +class MeasureJungfrau(MxFlyscanBase): + """ Scan with the Jungfrau detector + + The scan itself is executed by the scan service running on the Aerotech + controller. Ophyd just configures, launches it and waits for completion. + + Example + ------- + >>> scans.measure_jungfrau(start=42, range=180, scan_time=20) + + Parameters + ---------- + columns : int + ??? + rows : int + ??? + width : int + ??? + height : int + ??? + exp_time : float + Exposure time per point [s]. + sleep : float + (default: 0.1). + """ + scan_name = "measure_jungfrau" + required_kwargs = ["columns", "rows", "width", "height", "exp_time"] + + def __init__(self, *args, parameter: dict = None, **kwargs): + super().__init__(parameter=parameter, **kwargs) + self.axis = ['omega'] + + self.scan_command = AbrCmd.JUNGFRAU + self.scan_num_cols = self.caller_kwargs.get("columns") + self.scan_num_rows = self.caller_kwargs.get("rows") + self.scan_width = self.caller_kwargs.get("width") + self.scan_height = self.caller_kwargs.get("height") + self.scan_exp_time = self.caller_kwargs.get("exp_time") + self.scan_sleep = self.caller_kwargs.get("sleep", 0.1) + + self.scan_timeout = None if self.scan_sleep <= 0 else 5.0 + (self.scan_num_rows * self.scan_sleep) + (self.scan_num_cols * self.scan_num_rows * self.scan_exp_time) + + def pre_scan(self): + # ToDo: Move roughly to start position + + # Configure the global variables + d = {"scan_command" : self.scan_command, + "var_1" : self.scan_num_cols, + "var_2" : self.scan_num_rows, + "var_3" : self.scan_width, + "var_4" : self.scan_height, + "var_5" : self.scan_exp_time, + "var_6" : self.scan_sleep, + "var_7" : 0, + "var_8" : 0, + } + yield from self.stubs.send_rpc_and_wait("abr", "configure", d) + + # Call super + yield from self.stubs.pre_scan() + + def scan_core(self): + """ The actual scan logic comes here. + """ + self.pointID = 0 + # Kick off the run + yield from self.stubs.command("abr", "start_command.set", 1) + logger.info("Measurement launched on the ABR stage...") + + # Wait for grid scanner to finish + st = yield from self.stubs.send_rpc_and_wait("abr", "complete", self.scan_timeout) + st.wait() + + # Go back to direct mode + st = yield from self.stubs.send_rpc_and_wait("abr", "set_axis_mode", "direct") + st.wait() + + +