A3200 starts to get ready for scanning

This commit is contained in:
Unknown MX Person
2024-08-28 13:23:06 +02:00
parent 13c6d7b8fb
commit 602317faa8
3 changed files with 519 additions and 753 deletions

View File

@@ -4,40 +4,27 @@
This module provides an object to control the Aerotech Abr rotational stage.
The following constants are defined.
Measurement phases:
Aerotech.ABR_DONE
Aerotech.ABR_READY
Aerotech.ABR_BUSY
Axis mode
Aerotech.DIRECT_MODE
Aerotech.MEASURING_MODE
Shutter
Aerotech.SHUTTER_OPEN
Aerotech.SHUTTER_CLOSE
Methods in the Abr class
========================
Standard bluesky interface:
AerotechAbrStage.configure(d={...})
AerotechAbrStage.kickoff()
AerotechAbrStage.stop()
Additional bluesky functionality:
Aerotech.is_homed()
Aerotech.do_homing(wait=True)
Aerotech.get_ready(ostart=None, orange=None, etime=None, wait=True)
Aerotech.is_done()
Aerotech.is_ready()
Aerotech.is_busy()
Aerotech.stop()
Aerotech.start_exposure()
Aerotech.wait_status(status)
Aerotech.set_mode(, value)
Aerotech.get_mode()
Aerotech.set_direct_mode()
Aerotech.set_measuring_mode()
Aerotech.move(angle, wait=False, speed=None)
Aerotech.set_shutter(state)
@@ -45,48 +32,38 @@ Attribute Access in Abr class
=============================
The Abr class overwrites the getattr and setattr methods to provide a pythonic
way of controlling the Abr.
way of controlling the rootation stage of the Abr.
The following properties are implemented:
velo
velocity
Sets the velocity of rotation: ``-ES-DF1:ROTX-SETV``
omega
Move the omega angle without any wait: ``-ES-DF1:ROTX-VAL``
exposure_time
exp_time
Sets the PV for the measurement's exposure time: ``-ES-OSC:ETIME``
start_angle
Sets the PV for the measurement's starting angle: ``-ES-OSC:START-POS``
oscillation_angle
Sets the PV for the measurement's oscillation angle: ``-ES-OSC:RANGE``
shutter
Controls the shutter: ``-ES-PH1:SET`` and ``-ES-PH1:GET``
mode
Controls the axis mode: ``-ES-DF1:AXES-MODE``
measurement_state
Returns the PV for the measurement state: ``-ES-OSC:DONE``
Controls the PV for the measurement state: ``-ES-OSC:DONE``
axis_mode
Returns the axis mode: ``-ES-DF1:AXES-MODE``
Examples
========
import Aerotech
abr = Aerotech.Abr()
abr = AerotechAbrStage(prefix="X06DA-ES-DF1", name="abr")
# move omega to 270.0 degrees
abr.omega = 270.0
@@ -95,7 +72,7 @@ Examples
abr.move(180, wait=True)
# move omega to 3000 degrees at 360 degrees/s and wait for movement to finish
abr.move(3000, speed=360, wait=True)
abr.move(3000, velocity=360, wait=True)
# stop any movement
abr.stop() # this function only returns after the STATUS is back to OK
@@ -198,341 +175,13 @@ class AerotechAbrStage(Device):
# A few PVs still needed from grid
raster_scan_done = Component(EpicsSignal, "-ES-GRD:SCAN-DONE", kind=Kind.config)
raster_num_rows = Component(EpicsSignal, "-ES-GRD:SCAN-DONE", kind=Kind.config)
# def __init__(self):
# self.__pv_ostart = PV(f"{BEAMLINE}-ES-OSC:START-POS")
# self.__pv_orange = PV(f"{BEAMLINE}-ES-OSC:RANGE")
# self.__pv_etime = PV(f"{BEAMLINE}-ES-OSC:ETIME")
# self.__pv_get_ready = PV(f"{BEAMLINE}-ES-OSC:READY.PROC")
# self.__pv_start = PV(f"{BEAMLINE}-ES-OSC:START")
# self.__pv_phase = PV(f"{BEAMLINE}-ES-OSC:DONE")
# pend_io(10)
def measure_standard(self, start, wedge, etime, ready_rate=500.0):
"""measure a standard wedge from `start` to `start` + `wedge` during `etime` seconds"""
self.command(CMD_MEASURE_STANDARD)
self._var_1.set(start).wait()
self._var_2.set(wedge).wait()
self._var_3.set(etime).wait()
self._var_4.set(ready_rate).wait()
self.start_command()
def measure_scan_slits(self, delta_x, delta_y, velocity):
"""measure a standard wedge from `start` to `start` + `wedge` during `etime` seconds"""
self.command(CMD_SLIT_SCAN)
self._var_1.set(delta_x).wait()
self._var_2.set(delta_y).wait()
self._var_3.set(velocity).wait()
self.start_command()
def measure_raster_simple(self, etime, cell_width, cell_height, ncols, nrows):
"""measures the specified grid assuming the goniometer is currently at the CENTER of TOP-LEFT CELL
etime: a float in seconds
cell_width: a float in mm
cell_height: a float in mm
ncols: an int
nrows: an int
"""
# self.reset()
self.command(CMD_RASTER_SCAN_SIMPLE)
self.raster.scan_done.set(0).wait() # make SCAN DONE go into busy state
self._var_1.set(etime).wait()
self._var_2.set(cell_width).wait()
self._var_3.set(cell_height).wait()
self._var_4.set(int(ncols)).wait()
self._var_5.set(int(nrows)).wait()
self._var_6.set(0).wait()
self._var_7.set(0).wait()
self._var_8.set(0).wait()
self.raster._raster_num_rows = nrows
self.start_command()
def measure_sastt(self, etime, cell_width, cell_height, ncols, nrows, even_tweak=0.0, odd_tweak=0.0, version=1):
"""measures the specified grid assuming the goniometer is currently at the CENTER of the required grid
etime: a float in seconds
cell_width: a float in mm
cell_height: a float in mm
ncols: an int
nrows: an int
odd_tweak, even_tweak: float, a distance to be added to the start of
the open shutter command. used on in the scan mode
version=3
- these values can be either positive/negative
- they should be small, i.e. just to tweak a trriggering event
by minor amounts
version: an int (1, 2, 3)
1 = original snake scan, single PSO window
2 = scan always from LEFT---RIGHT
3 = snake scan alternating PSO window for even/odd rows
"""
# self.reset()
if version == 1:
self.command(CMD_SCAN_SASTT)
elif version == 2:
self.command(CMD_SCAN_SASTT_V2)
elif version == 3:
self.command(CMD_SCAN_SASTT_V3)
else:
raise RuntimeError("non existing SAS-TT scan mode")
self.raster.scan_done.set(0).wait() # make SCAN DONE go into busy state
self._var_1.set(etime).wait()
self._var_2.set(cell_width).wait()
self._var_3.set(cell_height).wait()
self._var_4.set(int(ncols)).wait()
self._var_5.set(int(nrows)).wait()
self._var_6.set(even_tweak).wait()
self._var_7.set(odd_tweak).wait()
self._var_8.set(0).wait()
self.raster._raster_num_rows = nrows
self.start_command()
def measure_vertical_line(self, cellHeight, numCells, exposureTime):
"""measure a vertical line using the GMY motor
The line is nummCells * cellHeight long and the exposureTime is per cell.
`cellHeight` in mm
`numCells` an integer
`exposureTime` in seconds / per cellHeight
"""
self.command(CMD_VERTICAL_LINE_SCAN)
self._var_1.set(cellHeight).wait()
self._var_2.set(numCells).wait()
self._var_3.set(exposureTime).wait()
self.start_command()
def measure_screening(self, start, oscangle, etime, degrees, frames, delta=0.5):
self.command(CMD_SCREENING)
self._var_1.set(start).wait()
self._var_2.set(oscangle).wait()
self._var_3.set(etime).wait()
self._var_4.set(degrees).wait()
self._var_5.set(frames).wait()
self._var_6.set(delta).wait()
self.start_command()
def measure_fast_omega(self, start, wedge, etime, rate=200.0):
self.command(CMD_SUPER_FAST_OMEGA)
self._var_1.set(start).wait()
self._var_2.set(wedge).wait()
self._var_3.set(etime).wait()
self._var_4.set(rate).wait()
self.start_command()
def measure_still_wedge(self, start, wedge, etime, oscangle, sleep_after_shutter_close=0.010):
self.command(CMD_STILL_WEDGE)
self._var_1.set(start).wait()
self._var_2.set(wedge).wait()
self._var_3.set(etime).wait()
self._var_4.set(oscangle).wait()
self._var_5.set(sleep_after_shutter_close).wait()
self.start_command()
def measure_stills(self, number_of_frames, exposure_time, idle=0.0):
"""collect still images, i.e. no omega rotation
number_of_frames => an integer
exposure_time => a float
idle => sleep in between each exposure
IMPORTANT: use idle=0.0 to prevent shutter open/close
"""
self.command(CMD_STILLS)
self._var_1.set(number_of_frames).wait()
self._var_2.set(exposure_time).wait()
self._var_3.set(idle).wait()
self.start_command()
def measure_repeat_singles(self, frames, etime, oscangle, sleep=0.0, delta=0.5):
self.command(CMD_REPEAT_SINGLE_OSCILLATION)
self._var_1.set(frames).wait()
self._var_2.set(etime).wait()
self._var_3.set(oscangle).wait()
self._var_4.set(sleep).wait()
self._var_5.set(delta).wait()
self.start_command()
def measure_single(self, start_angle, oscillation_angle, exposure_time, delta=0.0, settle=0.0):
self.command(CMD_SINGLE_OSCILLATION)
self._var_1.set(start_angle).wait()
self._var_2.set(oscillation_angle).wait()
self._var_3.set(exposure_time).wait()
self._var_4.set(delta).wait()
self._var_5.set(settle).wait()
self.start_command()
def measure_raster(self, positions, columns, angle, etime):
"""raster via generic experiment interface"""
self.reset()
if len(positions) == 1:
raise RuntimeWarning("Raster scan with one cell makes no sense")
self.command(CMD_RASTER_SCAN)
rows = len(positions) / columns
x1, y1, z1 = tuple(positions[0]) # first cell on first row
x2, y2, z2 = tuple(positions[1]) # second cell on first row
x4, y4, z4 = tuple(positions[columns - 1]) # last cell on first row
if rows > 1:
x3, y3, z3 = tuple(positions[columns]) # first cell on second row
ystep = y3 - y1
zstep = z3 - z1
gmy_step = math.sqrt(pow(y3 - y1, 2) + pow(z3 - z1, 2))
else:
ystep = 0.010
zstep = 0.010
gmy_step = 0.010
half_cell = abs(x2 - x1) / 2.0
start_x = x1 - half_cell
end_x = x4 + half_cell
self._raster_starting_x = start_x
self._raster_starting_y = y1
self._raster_starting_z = z1
self._raster_step_y = ystep
self._raster_step_z = zstep
self._raster_current_row = 0
self._raster_num_rows = rows
self._var_1.set(self.position).wait()
self._var_2.set(start_x).wait()
self._var_3.set(end_x).wait()
self._var_4.set(columns).wait()
self._var_5.set(rows).wait()
self._var_6.set(angle).wait()
self._var_7.set(etime).wait()
self._var_8.set(HALF_PERIOD).wait()
self._var_9.set(self.gmx.offset.get()).wait()
self._var_10.set(gmy_step).wait()
self.start_command()
def measure_raster_still(self, positions, columns, etime, delay):
"""raster via generic experiment interface
positions: list of coordinates for each cell
columns: how many columns in grid
etime: exposure time / cell_width
delay: delay configured in delay generator
to trigger detector after aerotech;
must be greater than actual shutter delay
"""
self.reset()
if len(positions) == 1:
raise RuntimeWarning("Raster scan with one cell makes no sense")
self.command(CMD_RASTER_SCAN_STILL)
rows = len(positions) / columns
x1, y1, z1 = tuple(positions[0]) # first cell on first row
x2, y2, z2 = tuple(positions[1]) # second cell on first row
x4, y4, z4 = tuple(positions[columns - 1]) # last cell on first row
if rows > 1:
x3, y3, z3 = tuple(positions[columns]) # first cell on second row
ystep = y3 - y1
zstep = z3 - z1
gmy_step = math.sqrt(pow(y3 - y1, 2) + pow(z3 - z1, 2))
else:
ystep = 0.010
zstep = 0.010
gmy_step = 0.010
half_cell = abs(x2 - x1) / 2.0
start_x = x1 - half_cell
end_x = x4 + half_cell
self._raster_starting_x = start_x
self._raster_starting_y = y1
self._raster_starting_z = z1
self._raster_step_y = ystep
self._raster_step_z = zstep
self._raster_current_row = 0
self._raster_num_rows = rows
self._var_1.set(self.position).wait()
self._var_2.set(start_x).wait()
self._var_3.set(end_x).wait()
self._var_4.set(columns).wait()
self._var_5.set(rows).wait()
self._var_6.set(delay).wait()
self._var_7.set(etime).wait()
self._var_8.set(HALF_PERIOD).wait()
self._var_9.set(self.gmx.offset.get()).wait()
self._var_10.set(gmy_step).wait()
self.start_command()
def measure_jungfrau(self, columns, rows, width, height, etime, sleep=0.100):
"""for Jungfrau, grid from current position"""
self.reset()
self.command(CMD_JUNGFRAU)
self._var_1.set(columns).wait()
self._var_2.set(rows).wait()
self._var_3.set(height).wait()
self._var_4.set(width).wait()
self._var_5.set(etime).wait()
self._var_6.set(sleep).wait()
self._var_7.set(0.0).wait()
self._var_8.set(0.0).wait()
self._var_9.set(0.0).wait()
self._var_10.set(0.0).wait()
self.start_command()
timeout = None if sleep <= 0 else 5.0 + (rows * sleep) + (columns * rows * etime)
if timeout:
# Define wait until the busy flag goes down (excluding initial update)
timestamp_ = 0
def isReady(*args, old_value, value, timestamp, **kwargs):
nonlocal timestamp_
result = False if (timestamp_== 0) else bool(value==GRID_SCAN_DONE)
print(f"Old {old_value}\tNew: {value}\tResult: {result}")
timestamp_ = timestamp
return result
# Subscribe and wait for update
status = SubscriptionStatus(self.raster.scan_done, isReady, timeout=timeout, settle_time=0.5)
status.wait()
self.set_direct_mode()
def measure_msox(self, start, wedge, etime, num_datasets=1, rest_time=0.0):
"""for msox
start = start omega
wedge = total range to measure
etime = total time for wedge
num_datasets = how many times to repeat
rest_time = how many seconds to rest in between datasets
"""
self.reset()
self.command(CMD_MSOX)
self._var_1.set(start).wait()
self._var_2.set(wedge).wait()
self._var_3.set(etime).wait()
self._var_4.set(num_datasets).wait()
self._var_5.set(rest_time).wait()
self._var_6.set(0).wait()
self._var_7.set(0).wait()
self._var_8.set(0).wait()
self._var_9.set(0).wait()
self._var_10.set(0).wait()
self.start_command()
raster_num_rows = Component(EpicsSignal, "-ES-GRD:ROW-DONE", kind=Kind.config)
def set_axis_mode(self, mode: str, settle_time=0.1) -> None:
if mode=="direct":
self.axisModeDirect.set(37, settle_time=settle_time).wait()
if mode=="measuring":
self.axisAxesMode.set(MEASURING_MODE, settle_time=settle_time).wait()
def configure(self, d: dict) -> tuple:
@@ -572,9 +221,9 @@ class AerotechAbrStage(Device):
"""
return self.raster.complete()
def reset(self):
self.command(CMD_NONE)
self.set_direct_mode()
def reset(self, settle_time=0.1):
self.scan_command.set(CMD_NONE, settle_time=settle_time).wait()
self.set_axis_mode("direct", settle_time=settle_time)
def is_ioc_ok(self):
return 0 == self.status.get()
@@ -597,27 +246,27 @@ class AerotechAbrStage(Device):
PARAMETERS
`wait` true / false if the routine is to wait for the homing to finish.
"""
self.omega.home.set(1).wait()
self.omega.home.set(1, settle_time=1).wait()
poll(1.0)
if not wait:
return
while not self.omega.is_homed():
poll(0.2)
time.sleep(0.2)
@property
def velo(self):
def velocity(self):
return self.omega.velocity.get()
@velo.setter
def velo(self, value):
@velocity.setter
def velocity(self, value):
self.omega.velocity.set(value).wait()
@property
def etime(self):
return self.osc.etime.get()
def exp_time(self):
return self.osc.exp_time.get()
@etime.setter
def etime(self, value):
@exp_time.setter
def exp_time(self, value):
self.osc.etime.set(value).wait()
@property
@@ -657,17 +306,9 @@ class AerotechAbrStage(Device):
return state == self._shutter.get()
@property
def mode(self):
def axis_mode(self):
return self.axisAxesMode.get()
@mode.setter
def mode(self, value):
self.axisAxesMode.set(value).wait()
def get_ready(self, ostart=None, orange=None, etime=None, wait=True):
self.wait_for_movements()
if self.measurement_state == ABR_BUSY:
@@ -683,9 +324,7 @@ class AerotechAbrStage(Device):
if etime is not None:
self.osc.etime.set(etime).wait()
self.osc.get_ready.set(1).wait()
poll(0.1)
self.osc.get_ready.set(1, settle_time=0.1).wait()
if wait:
for n in range(10):
@@ -707,7 +346,7 @@ class AerotechAbrStage(Device):
moving = True
while not timeisup and moving:
poll(0.1)
time.sleep(0.1)
try:
moving = self.is_moving()
except:
@@ -731,10 +370,9 @@ class AerotechAbrStage(Device):
couple of seconds.
"""
self.taskStop.put(1, wait=True)
poll(wait_after_reload)
self.taskStop.set(1, settle_time=wait_after_reload).wait()
self.reset()
poll(0.1)
time.sleep(0.1)
reload_programs = stop
@@ -746,79 +384,16 @@ class AerotechAbrStage(Device):
try:
self.osc.wait_status(ABR_BUSY, timeout=1)
except RuntimeWarning as e:
print(str(e), end=" ")
print(" --- trying start again.")
self.osc.taskStart.set(1).wait()
print("%s --- trying start again.", str(e))
self.osc.kickoff()
def move(self, position, wait=False, velocity=None, relative=None, direct=False, **kwargs) -> MoveStatus:
"""Move omega to `angle` at `speed` deg/s and `wait` (or not).
If a `speed` is specified, it will be used for the movement but the speed
setting will be reverted to its old value afterwards.
If `wait` is true, the routine waits until the omega position is close
to within 0.01 degrees to the target or that a timeout occured. The
timeout is the time required for the movement plus 5 seconds.
In case of a timeout a RuntimeWarning is raised.
PARAMETERS:
`position` [degrees] a float specifying where to move to.
No limits.
`velo` [degrees/s] a float specifying speed to move at.
Limits: 0 - 360
`wait` [boolean] wait or not for movement to finish.
""" Move the omega axis
"""
return self.omega.move(position, wait=wait, velocity=velocity, relative=relative, direct=direct, **kwargs)
def set_shutter(self, state):
if self.axisAxesMode.get():
print("ABR is not in direct mode; cannot manipulate shutter")
return False
state = str(state).lower()
if state not in ["1", "0", "closed", "open"]:
print("unknown shutter state requested")
return None
elif state in ["1", "open"]:
state = 1
elif state == ["0", "closed"]:
state = 0
self._shutter.put(state, wait=True)
return state == self._shutter.get()
if __name__ == "__main__":
aero = AerotechAbrStage(prefix="X06DA-ES-DF1", name="abr")
aero.wait_for_connection()
# import argparse
# from time import sleep
# parser = argparse.ArgumentParser(description="aerotech client")
# parser.add_argument("--stop", help="stop/reset/restart aerotech", action="store_true")
# parser.add_argument("--sanitize-speeds", help="reset speeds on axis to sane defaults", action="store_true")
# parser.add_argument("--measure-standard", help="measure a standard dataset", action="store")
# parser.add_argument("--omega", help="move omega to...", action="store")
# parser.add_argument("--gmx", help="move gmx to...", action="store")
# parser.add_argument("--gmy", help="move gmy to...", action="store")
# parser.add_argument("--gmz", help="move gmz to...", action="store")
# args = parser.parse_args()
# abr = Abr()
# if args.stop:
# abr.stop()
# elif args.sanitize_speeds:
# abr.velo(AXIS_GMX, 50.0)
# abr.velo(AXIS_GMY, 10.0)
# abr.velo(AXIS_GMZ, 10.0)
# abr.velo(AXIS_OMEGA, 180.0)
# elif args.gmx:
# abr.move_x(args.gmx)
# elif args.measure_standard:
# start_angle, total_range, total_time = args.measure_standard.split(",")
# abr.measure_standard(start_angle, total_range, total_time)
abr = AerotechAbrStage(prefix="X06DA-ES-DF1", name="abr")
abr.wait_for_connection()

View File

@@ -343,10 +343,8 @@ class A3200RasterScanner(Device):
class A3200Oscillator(Device):
"""Positioner wrapper for motors on the Aerotech A3200 controller. As the
IOC does not provide a motor record, this class simply wraps axes into
a standard Ophyd positioner for the BEC. It also has some additional
functionality for diagnostics."""
"""No clue what this does, seems to be redundant with the task based grid scanners...
"""
ostart_pos = Component(EpicsSignal, "-ES-OSC:START-POS", put_complete=True, kind=Kind.config)
orange = Component(EpicsSignal, "-ES-OSC:RANGE", put_complete=True, kind=Kind.config)
@@ -367,6 +365,9 @@ class A3200Oscillator(Device):
def is_busy(self):
return ABR_BUSY == self.phase.get()
def kickoff(self):
self.osc.taskStart.set(1).wait()
def wait_status(self, target, timeout=60.0) -> SubscriptionStatus:
"""Wait for the Aertotech IOC to reach the desired `status`.

File diff suppressed because it is too large Load Diff