Still a lot to do

This commit is contained in:
2024-07-16 14:32:09 +02:00
parent 4a6e4092ca
commit e7fd8e453d
3 changed files with 1169 additions and 5 deletions

View File

@@ -33,7 +33,7 @@ dccm_pitch1:
softwareTrigger: false
dccm_energy1:
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-OP-DCCM:_ENERGY1'}
deviceConfig: {prefix: 'X06DA-OP-DCCM:ENERGY1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
@@ -57,7 +57,7 @@ dccm_pitch2:
softwareTrigger: false
dccm_energy2:
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-OP-DCCM:_ENERGY2'}
deviceConfig: {prefix: 'X06DA-OP-DCCM:ENERGY2'}
onFailure: buffer
enabled: true
readoutPriority: monitored

884
pxiii_bec/devices/A3200.py Normal file
View File

@@ -0,0 +1,884 @@
"""
``Aerotech`` --- Aerotech control software
******************************************
This module provides an object to control the Aerotech Abr rotational stage.
The following constants are defined.
Measurement phases:
Aerotech.ABR_DONE
Aerotech.ABR_READY
Aerotech.ABR_BUSY
Axis mode
Aerotech.DIRECT_MODE
Aerotech.MEASURING_MODE
Shutter
Aerotech.SHUTTER_OPEN
Aerotech.SHUTTER_CLOSE
Methods in the Abr class
========================
Aerotech.is_homed()
Aerotech.do_homing(wait=True)
Aerotech.get_ready(ostart=None, orange=None, etime=None, wait=True)
Aerotech.is_done()
Aerotech.is_ready()
Aerotech.is_busy()
Aerotech.stop()
Aerotech.start_exposure()
Aerotech.wait_status(status)
Aerotech.set_mode(, value)
Aerotech.get_mode()
Aerotech.set_direct_mode()
Aerotech.set_measuring_mode()
Aerotech.move(angle, wait=False, speed=None)
Aerotech.set_shutter(state)
Attribute Access in Abr class
=============================
The Abr class overwrites the getattr and setattr methods to provide a pythonic
way of controlling the Abr.
The following properties are implemented:
velo
Sets the velocity of rotation: ``-ES-DF1:ROTX-SETV``
omega
Move the omega angle without any wait: ``-ES-DF1:ROTX-VAL``
exposure_time
Sets the PV for the measurement's exposure time: ``-ES-OSC:ETIME``
start_angle
Sets the PV for the measurement's starting angle: ``-ES-OSC:START-POS``
oscillation_angle
Sets the PV for the measurement's oscillation angle: ``-ES-OSC:RANGE``
shutter
Controls the shutter: ``-ES-PH1:SET`` and ``-ES-PH1:GET``
mode
Controls the axis mode: ``-ES-DF1:AXES-MODE``
measurement_state
Controls the PV for the measurement state: ``-ES-OSC:DONE``
Examples
========
import Aerotech
abr = Aerotech.Abr()
# move omega to 270.0 degrees
abr.omega = 270.0
# move omega to 180 degrees and wait for movement to finish
abr.move(180, wait=True)
# move omega to 3000 degrees at 360 degrees/s and wait for movement to finish
abr.move(3000, speed=360, wait=True)
# stop any movement
abr.stop() # this function only returns after the STATUS is back to OK
"""
import time
import math
from beamline import BEAMLINE
from epics import PV, poll
from epics.ca import pend_io
from ophyd import Component, Device, EpicsSignal, EpicsSignalRO, Kind
from ophyd.status import Status
class A3200Axis(PVPositioner):
ABR_DONE = 0
ABR_READY = 1
ABR_BUSY = 2
GRID_SCAN_BUSY = 0
GRID_SCAN_DONE = 1
DIRECT_MODE = 0
MEASURING_MODE = 1
SHUTTER_CLOSE = 0
SHUTTER_OPEN = 1
FULL_PERIOD = 0
HALF_PERIOD = 1
def pv_wait(pv, val, timeout=10.0):
timeout = timeout + time.time()
timeisup = False
while val != pv.get() and not timeisup:
poll(0.01)
timeisup = time.time() > timeout
if timeisup:
raise RuntimeWarning("timeout waiting for %s to reach %s" % (pv.pvname, str(val)))
CMD_NONE = 0
CMD_RASTER_SCAN_SIMPLE = 1
CMD_MEASURE_STANDARD = 2
CMD_VERTICAL_LINE_SCAN = 3
CMD_SCREENING = 4
CMD_SUPER_FAST_OMEGA = 5
CMD_STILL_WEDGE = 6
CMD_STILLS = 7
CMD_REPEAT_SINGLE_OSCILLATION = 8
CMD_SINGLE_OSCILLATION = 9
CMD_OLD_FASHIONED = 10
CMD_RASTER_SCAN = 11
CMD_JET_ROTATION = 12
CMD_X_HELICAL = 13
CMD_X_RUNSEQ = 14
CMD_JUNGFRAU = 15
CMD_MSOX = 16
CMD_SLIT_SCAN = 17
CMD_RASTER_SCAN_STILL = 18
CMD_SCAN_SASTT = 19
CMD_SCAN_SASTT_V2 = 20
CMD_SCAN_SASTT_V3 = 21
AXIS_OMEGA = 1
AXIS_GMX = 2
AXIS_GMY = 3
AXIS_GMZ = 4
AXIS_STY = 5
AXIS_STZ = 6
class AerotechAbrStage(Device):
taskStop = Component(EpicsSignal, "-ES-AERO:TSK-STOP", put_complete=True, kind=Kind.omitted)
status = Component(EpicsSignal, "-ES-AERO:STAT", put_complete=True, kind=Kind.omitted)
# Enable/disable motor movement via the IOC (i.e. make it task-only)
axisModeLocked = Component(EpicsSignal, "-ES-DF1:LOCK", put_complete=True, kind=Kind.omitted)
axisModeDirect = Component(EpicsSignal, "-ES-DF1:MODE-DIRECT", put_complete=True, kind=Kind.omitted)
axisAxesMode = Component(EpicsSignal, "-ES-DF1:AXES-MODE", put_complete=True, kind=Kind.omitted)
_shutter = Component(
EpicsSignal, "-ES-PH1:GET", write_pv="-ES-PH1:SET", put_complete=True, kind=Kind.config,
)
raster = Component(A3200RasterScanner, "", kind=Kind.config)
osc = Component(A3200Oscillator, "", kind=Kind.config)
omega = Component(A3200Axis, "-ES-DF1:OMEGA", kind=Kind.hinted)
gmx = Component(A3200Axis, "-ES-DF1:GMX", kind=Kind.hinted)
gmy = Component(A3200Axis, "-ES-DF1:GMY", kind=Kind.hinted)
gmz = Component(A3200Axis, "-ES-DF1:GMZ", kind=Kind.hinted)
_zcmd = Component(EpicsSignal, "-ES-PSO:CMD", put_complete=True, kind=Kind.omitted)
_start_command = Component(EpicsSignal, "-ES-PSO:START-TEST.PROC", put_complete=True, kind=Kind.omitted)
_stop_command = Component(EpicsSignal, "-ES-PSO:STOP-TEST.PROC", put_complete=True, kind=Kind.omitted)
_var_1 = Component(EpicsSignal, "-ES-PSO:VAR-1", put_complete=True, kind=Kind.omitted)
_var_2 = Component(EpicsSignal, "-ES-PSO:VAR-2", put_complete=True, kind=Kind.omitted)
_var_3 = Component(EpicsSignal, "-ES-PSO:VAR-3", put_complete=True, kind=Kind.omitted)
_var_4 = Component(EpicsSignal, "-ES-PSO:VAR-4", put_complete=True, kind=Kind.omitted)
_var_5 = Component(EpicsSignal, "-ES-PSO:VAR-5", put_complete=True, kind=Kind.omitted)
_var_6 = Component(EpicsSignal, "-ES-PSO:VAR-6", put_complete=True, kind=Kind.omitted)
_var_7 = Component(EpicsSignal, "-ES-PSO:VAR-7", put_complete=True, kind=Kind.omitted)
_var_8 = Component(EpicsSignal, "-ES-PSO:VAR-8", put_complete=True, kind=Kind.omitted)
_var_9 = Component(EpicsSignal, "-ES-PSO:VAR-9", put_complete=True, kind=Kind.omitted)
_var_10 = Component(EpicsSignal, "-ES-PSO:VAR-10", put_complete=True, kind=Kind.omitted)
# A few PVs still needed from grid
raster_scan_done = Component(EpicsSignal, "-ES-GRD:SCAN-DONE", kind=Kind.config)
raster_num_rows = Component(EpicsSignal, "-ES-GRD:SCAN-DONE", kind=Kind.config)
def __init__(self):
self.__pv_ostart = PV(f"{BEAMLINE}-ES-OSC:START-POS")
self.__pv_orange = PV(f"{BEAMLINE}-ES-OSC:RANGE")
self.__pv_etime = PV(f"{BEAMLINE}-ES-OSC:ETIME")
self.__pv_get_ready = PV(f"{BEAMLINE}-ES-OSC:READY.PROC")
self.__pv_start = PV(f"{BEAMLINE}-ES-OSC:START")
self.__pv_phase = PV(f"{BEAMLINE}-ES-OSC:DONE")
pend_io(10)
def command(self, cmd):
self._zcmd.set(cmd).wait()
def start_command(self):
self._start_command.set(1).wait()
def measure_standard(self, start, wedge, etime, ready_rate=500.0):
"""measure a standard wedge from `start` to `start` + `wedge` during `etime` seconds"""
self.command(CMD_MEASURE_STANDARD)
self._var_1.set(start).wait()
self._var_2.set(wedge).wait()
self._var_3.set(etime).wait()
self._var_4.set(ready_rate).wait()
self.start_command()
def measure_scan_slits(self, delta_x, delta_y, velocity):
"""measure a standard wedge from `start` to `start` + `wedge` during `etime` seconds"""
self.command(CMD_SLIT_SCAN)
self._var_1.set(delta_x).wait()
self._var_2.set(delta_y).wait()
self._var_3.set(velocity).wait()
self.start_command()
def measure_raster_simple(self, etime, cell_width, cell_height, ncols, nrows):
"""measures the specified grid assuming the goniometer is currently at the CENTER of TOP-LEFT CELL
etime: a float in seconds
cell_width: a float in mm
cell_height: a float in mm
ncols: an int
nrows: an int
"""
# self.reset()
self.command(CMD_RASTER_SCAN_SIMPLE)
self.raster.scan_done.set(0).wait() # make SCAN DONE go into busy state
self._var_1.set(etime).wait()
self._var_2.set(cell_width).wait()
self._var_3.set(cell_height).wait()
self._var_4.set(int(ncols)).wait()
self._var_5.set(int(nrows)).wait()
self._var_6.set(0).wait()
self._var_7.set(0).wait()
self._var_8.set(0).wait()
self.raster._raster_num_rows = nrows
self.start_command()
def measure_sastt(self, etime, cell_width, cell_height, ncols, nrows, even_tweak=0.0, odd_tweak=0.0, version=1):
"""measures the specified grid assuming the goniometer is currently at the CENTER of the required grid
etime: a float in seconds
cell_width: a float in mm
cell_height: a float in mm
ncols: an int
nrows: an int
odd_tweak, even_tweak: float, a distance to be added to the start of
the open shutter command. used on in the scan mode
version=3
- these values can be either positive/negative
- they should be small, i.e. just to tweak a trriggering event
by minor amounts
version: an int (1, 2, 3)
1 = original snake scan, single PSO window
2 = scan always from LEFT---RIGHT
3 = snake scan alternating PSO window for even/odd rows
"""
# self.reset()
if version == 1:
self.command(CMD_SCAN_SASTT)
elif version == 2:
self.command(CMD_SCAN_SASTT_V2)
elif version == 3:
self.command(CMD_SCAN_SASTT_V3)
else:
raise RuntimeError("non existing SAS-TT scan mode")
self.raster.scan_done.set(0).wait() # make SCAN DONE go into busy state
self._var_1.set(etime).wait()
self._var_2.set(cell_width).wait()
self._var_3.set(cell_height).wait()
self._var_4.set(int(ncols)).wait()
self._var_5.set(int(nrows)).wait()
self._var_6.set(even_tweak).wait()
self._var_7.set(odd_tweak).wait()
self._var_8.set(0).wait()
self.raster._raster_num_rows = nrows
self.start_command()
def measure_vertical_line(self, cellHeight, numCells, exposureTime):
"""measure a vertical line using the GMY motor
The line is nummCells * cellHeight long and the exposureTime is per cell.
`cellHeight` in mm
`numCells` an integer
`exposureTime` in seconds / per cellHeight
"""
self.command(CMD_VERTICAL_LINE_SCAN)
self._var_1.set(cellHeight).wait()
self._var_2.set(numCells).wait()
self._var_3.set(exposureTime).wait()
self.start_command()
def measure_screening(self, start, oscangle, etime, degrees, frames, delta=0.5):
self.command(CMD_SCREENING)
self._var_1.set(start).wait()
self._var_2.set(oscangle).wait()
self._var_3.set(etime).wait()
self._var_4.set(degrees).wait()
self._var_5.set(frames).wait()
self._var_6.set(delta).wait()
self.start_command()
def measure_fast_omega(self, start, wedge, etime, rate=200.0):
self.command(CMD_SUPER_FAST_OMEGA)
self._var_1.set(start).wait()
self._var_2.set(wedge).wait()
self._var_3.set(etime).wait()
self._var_4.set(rate).wait()
self.start_command()
def measure_still_wedge(self, start, wedge, etime, oscangle, sleep_after_shutter_close=0.010):
self.command(CMD_STILL_WEDGE)
self._var_1.set(start).wait()
self._var_2.set(wedge).wait()
self._var_3.set(etime).wait()
self._var_4.set(oscangle).wait()
self._var_5.set(sleep_after_shutter_close).wait()
self.start_command()
def measure_stills(self, number_of_frames, exposure_time, idle=0.0):
"""collect still images, i.e. no omega rotation
number_of_frames => an integer
exposure_time => a float
idle => sleep in between each exposure
IMPORTANT: use idle=0.0 to prevent shutter open/close
"""
self.command(CMD_STILLS)
self._var_1.set(number_of_frames).wait()
self._var_2.set(exposure_time).wait()
self._var_3.set(idle).wait()
self.start_command()
def measure_repeat_singles(self, frames, etime, oscangle, sleep=0.0, delta=0.5):
self.command(CMD_REPEAT_SINGLE_OSCILLATION)
self._var_1.set(frames).wait()
self._var_2.set(etime).wait()
self._var_3.set(oscangle).wait()
self._var_4.set(sleep).wait()
self._var_5.set(delta).wait()
self.start_command()
def measure_single(self, start_angle, oscillation_angle, exposure_time, delta=0.0, settle=0.0):
self.command(CMD_SINGLE_OSCILLATION)
self._var_1.set(start_angle).wait()
self._var_2.set(oscillation_angle).wait()
self._var_3.set(exposure_time).wait()
self._var_4.set(delta).wait()
self._var_5.set(settle).wait()
self.start_command()
def measure_raster(self, positions, columns, angle, etime):
"""raster via generic experiment interface"""
self.reset()
if len(positions) == 1:
raise RuntimeWarning("Raster scan with one cell makes no sense")
self.command(CMD_RASTER_SCAN)
rows = len(positions) / columns
x1, y1, z1 = tuple(positions[0]) # first cell on first row
x2, y2, z2 = tuple(positions[1]) # second cell on first row
x4, y4, z4 = tuple(positions[columns - 1]) # last cell on first row
if rows > 1:
x3, y3, z3 = tuple(positions[columns]) # first cell on second row
ystep = y3 - y1
zstep = z3 - z1
gmy_step = math.sqrt(pow(y3 - y1, 2) + pow(z3 - z1, 2))
else:
ystep = 0.010
zstep = 0.010
gmy_step = 0.010
half_cell = abs(x2 - x1) / 2.0
start_x = x1 - half_cell
end_x = x4 + half_cell
self._raster_starting_x = start_x
self._raster_starting_y = y1
self._raster_starting_z = z1
self._raster_step_y = ystep
self._raster_step_z = zstep
self._raster_current_row = 0
self._raster_num_rows = rows
self._var_1.set(self.position).wait()
self._var_2.set(start_x).wait()
self._var_3.set(end_x).wait()
self._var_4.set(columns).wait()
self._var_5.set(rows).wait()
self._var_6.set(angle).wait()
self._var_7.set(etime).wait()
self._var_8.set(HALF_PERIOD).wait()
self._var_9.set(self.gmx.offset.get()).wait()
self._var_10.set(gmy_step).wait()
self.start_command()
def measure_raster_still(self, positions, columns, etime, delay):
"""raster via generic experiment interface
positions: list of coordinates for each cell
columns: how many columns in grid
etime: exposure time / cell_width
delay: delay configured in delay generator
to trigger detector after aerotech;
must be greater than actual shutter delay
"""
self.reset()
if len(positions) == 1:
raise RuntimeWarning("Raster scan with one cell makes no sense")
self.command(CMD_RASTER_SCAN_STILL)
rows = len(positions) / columns
x1, y1, z1 = tuple(positions[0]) # first cell on first row
x2, y2, z2 = tuple(positions[1]) # second cell on first row
x4, y4, z4 = tuple(positions[columns - 1]) # last cell on first row
if rows > 1:
x3, y3, z3 = tuple(positions[columns]) # first cell on second row
ystep = y3 - y1
zstep = z3 - z1
gmy_step = math.sqrt(pow(y3 - y1, 2) + pow(z3 - z1, 2))
else:
ystep = 0.010
zstep = 0.010
gmy_step = 0.010
half_cell = abs(x2 - x1) / 2.0
start_x = x1 - half_cell
end_x = x4 + half_cell
self._raster_starting_x = start_x
self._raster_starting_y = y1
self._raster_starting_z = z1
self._raster_step_y = ystep
self._raster_step_z = zstep
self._raster_current_row = 0
self._raster_num_rows = rows
self._var_1.set(self.position).wait()
self._var_2.set(start_x).wait()
self._var_3.set(end_x).wait()
self._var_4.set(columns).wait()
self._var_5.set(rows).wait()
self._var_6.set(delay).wait()
self._var_7.set(etime).wait()
self._var_8.set(HALF_PERIOD).wait()
self._var_9.set(self.gmx.offset.get()).wait()
self._var_10.set(gmy_step).wait()
self.start_command()
def measure_jungfrau(self, columns, rows, width, height, etime, sleep=0.100):
"""for Jungfrau, grid from current position"""
self.reset()
self.command(CMD_JUNGFRAU)
self._var_1.set(columns).wait()
self._var_2.set(rows).wait()
self._var_3.set(height).wait()
self._var_4.set(width).wait()
self._var_5.set(etime).wait()
self._var_6.set(sleep).wait()
self._var_7.set(0.0).wait()
self._var_8.set(0.0).wait()
self._var_9.set(0.0).wait()
self._var_10.set(0.0).wait()
self.start_command()
timeout = None if sleep <= 0 else 5.0 + (rows * sleep) + (columns * rows * etime)
if timeout:
# Define wait until the busy flag goes down (excluding initial update)
timestamp_ = 0
def isReady(*args, old_value, value, timestamp, **kwargs):
nonlocal timestamp_
result = False if (timestamp_== 0) else bool(value==GRID_SCAN_DONE)
print(f"Old {old_value}\tNew: {value}\tResult: {result}")
timestamp_ = timestamp
return result
# Subscribe and wait for update
status = SubscriptionStatus(self.raster.scan_done, isReady, timeout=timeout, settle_time=0.5)
status.wait()
self.set_direct_mode()
def measure_msox(self, start, wedge, etime, num_datasets=1, rest_time=0.0):
"""for msox
start = start omega
wedge = total range to measure
etime = total time for wedge
num_datasets = how many times to repeat
rest_time = how many seconds to rest in between datasets
"""
self.reset()
self.command(CMD_MSOX)
self._var_1.set(start).wait()
self._var_2.set(wedge).wait()
self._var_3.set(etime).wait()
self._var_4.set(num_datasets).wait()
self._var_5.set(rest_time).wait()
self._var_6.set(0).wait()
self._var_7.set(0).wait()
self._var_8.set(0).wait()
self._var_9.set(0).wait()
self._var_10.set(0).wait()
self.start_command()
def configure_stills(self, frames, etime, sleep=0.0):
self._var_1.set(frames).wait()
self._var_2.set(etime).wait()
self._var_3.set(sleep).wait()
def reset(self):
self.command(CMD_NONE)
self.set_direct_mode()
def raster_2d(self):
pass
def is_ioc_ok(self):
return 0 == self.status.get()
def is_ready_and_willing(self):
return self.is_ioc_ok() and self.omega.is_homed()
def is_omega_ok(self):
return 0 == self.self.omega.status.get()
def is_homed(self):
"""Are the motors homed?"""
return 1 == self.omega.is_homed.get()
def do_homing(self, wait=True):
"""Execute the homing procedure.
Executes the homing procedure and waits (default) until it is completed.
PARAMETERS
`wait` true / false if the routine is to wait for the homing to finish.
"""
self.omega.home.set(1).wait()
poll(1.0)
if not wait:
return
while not self.omega.is_homed():
poll(0.2)
@property
def velo(self):
return self.omega.velocity.get()
@velo.setter
def velo(self, value):
self.omega.velocity.set(value).wait()
@property
def etime(self):
return self.osc.etime.get()
@etime.setter
def etime(self, value):
self.osc.etime.set(value).wait()
@property
def start_angle(self):
return self.osc.ostart_pos.get()
@start_angle.setter
def start_angle(self, value):
self.osc.ostart_pos(value).wait()
@property
def measurement_state(self):
return self.osc.phase.get()
@measurement_state.setter
def measurement_state(self, value):
self.osc.phase.set(value).wait()
@property
def shutter(self):
return self._shutter.get()
@shutter.setter
def shutter(self, value):
if self.axisAxesMode.get():
print("ABR is not in direct mode; cannot manipulate shutter")
return False
state = str(state).lower()
if state not in ["1", "0", "closed", "open"]:
print("unknown shutter state requested")
return None
elif state in ["1", "open"]:
state = 1
elif state == ["0", "closed"]:
state = 0
self._shutter.set(state).wait()
return state == self._shutter.get()
@property
def mode(self):
return self.axisAxesMode.get()
@mode.setter
def mode(self, value):
self.axisAxesMode.set(value).wait()
def get_ready(self, ostart=None, orange=None, etime=None, wait=True):
self.wait_for_movements()
if self.measurement_state == ABR_BUSY:
raise RuntimeError("ABR is not DONE!!!!")
if self.measurement_state == ABR_READY:
self.measurement_state = ABR_DONE
if ostart is not None:
self.osc.ostart.set(ostart).wait()
if orange is not None:
self.osc.orange.set(orange).wait()
if etime is not None:
self.osc.etime.set(etime).wait()
self.osc.get_ready.set(1).wait()
poll(0.1)
if wait:
for n in range(10):
try:
self.wait_status(ABR_READY, timeout=5).wait()
except RuntimeWarning as e:
print(str(e), end=" ")
print(" --- trying ready again.")
self.osc.get_ready.set(1).wait()
def wait_for_movements(self, timeout=60.0):
timeout = timeout + time.time()
timeisup = False
try:
moving = self.is_moving()
except:
moving = True
while not timeisup and moving:
poll(0.1)
try:
moving = self.is_moving()
except:
print("Aerotech() Failed to retrieve moving state for axes omega, gmx, gmy, gmz")
moving = True
timeisup = timeout < time.time()
if timeisup:
raise RuntimeWarning("timeout waiting for all axis to stop moving")
def is_moving(self):
return (
self.omega.moving
and self.gmx.moving and self.gmy.moving and self.gmz.moving )
def stop(self, wait_after_reload=1.0):
"""Stops current motions; reloads aerobasic programs; goes DIRECT
This will stop movements in both DIRECT and MEASURING modes. During the
stop the `status` temporarely goes to ERROR but reverts to OK after a
couple of seconds.
"""
self.taskStop.put(1, wait=True)
poll(wait_after_reload)
self.reset()
poll(0.1)
reload_programs = stop
def start_exposure(self):
"""Starts the previously configured exposure."""
self.wait_for_movements()
self.osc.taskStart.set(1).wait()
for n in range(10):
try:
self.osc.wait_status(ABR_BUSY, timeout=1)
except RuntimeWarning as e:
print(str(e), end=" ")
print(" --- trying start again.")
self.osc.taskStart.set(1).wait()
def velo(self, axis: int, speed: float = None) -> float:
"""set's the speed on the given axis to speed"""
if AXIS_GMX == axis:
velo_pv = self.__pv_gmx_velo
min, max = 0.01, 100.0
elif AXIS_GMY == axis:
velo_pv = self.__pv_gmy_velo
min, max = 0.01, 10.0
elif AXIS_GMZ == axis:
velo_pv = self.__pv_gmz_velo
min, max = 0.01, 10.0
elif AXIS_OMEGA == axis:
velo_pv = self.__pv_omega_velo
min, max = 0.01, 720.0
else:
raise Exception("unknown axis index")
if speed is not None:
if not (min <= speed <= max):
raise Exception(f"requested speed {speed} outside range {min} < speed < {max}")
velo_pv.put(speed)
poll(0.05)
return velo_pv.get()
def select_axis(self, axis):
"""returns PVs and limits for the given axis
axis: Aerotech.AXIS_{OMEGA=1, GMX=2, GMY=3, GMZ=4}
return (val_pv, rbv_pv, velo_pv, done_pv, velomin, velomax)
"""
if AXIS_GMX == axis:
val_pv = self.__pv_gmx_val
rbv_pv = self.__pv_gmx_rbv
velo_pv = self.__pv_gmx_velo
done_pv = self.__pv_gmx_done
min, max = 0.01, 100.0
elif AXIS_GMY == axis:
val_pv = self.__pv_gmy_val
rbv_pv = self.__pv_gmy_rbv
velo_pv = self.__pv_gmy_velo
done_pv = self.__pv_gmy_done
min, max = 0.01, 10.0
elif AXIS_GMZ == axis:
val_pv = self.__pv_gmz_val
rbv_pv = self.__pv_gmz_rbv
velo_pv = self.__pv_gmz_velo
done_pv = self.__pv_gmz_done
min, max = 0.01, 10.0
elif AXIS_OMEGA == axis:
val_pv = self.__pv_omega_set
rbv_pv = self.__pv_omega_get
velo_pv = self.__pv_omega_velo
done_pv = self.__pv_omega_done
min, max = 0.01, 720.0
else:
raise Exception("unknown axis index")
return (val_pv, rbv_pv, velo_pv, done_pv, min, max)
def rbv(self):
x = self.__pv_gmx_rbv.get()
y = self.__pv_gmy_get.get()
z = self.__pv_gmz_get.get()
omega = self.__pv_omega_get.get()
return {"gmx": x, "gmy": y, "gmz": z, "omega": omega}
def move(self, position, wait=False, velo=None, direct=False, **kwargs) -> StatusBase:
"""Move omega to `angle` at `speed` deg/s and `wait` (or not).
If a `speed` is specified, it will be used for the movement but the speed
setting will be reverted to its old value afterwards.
If `wait` is true, the routine waits until the omega position is close
to within 0.01 degrees to the target or that a timeout occured. The
timeout is the time required for the movement plus 5 seconds.
In case of a timeout a RuntimeWarning is raised.
PARAMETERS:
`position` [degrees] a float specifying where to move to.
No limits.
`velo` [degrees/s] a float specifying speed to move at.
Limits: 0 - 360
`wait` [boolean] wait or not for movement to finish.
"""
return self.omega.move(position, wait=wait, velo=velo, direct=direct, **kwargs)
def set_shutter(self, state):
if self.__pv_axes_mode.get():
print("ABR is not in direct mode; cannot manipulate shutter")
return False
state = str(state).lower()
if state not in ["1", "0", "closed", "open"]:
print("unknown shutter state requested")
return None
elif state in ["1", "open"]:
state = 1
elif state == ["0", "closed"]:
state = 0
self.__pv_shutter_set.put(state, wait=True)
return state == self.__pv_shutter_get.get()
if __name__ == "__main__":
import argparse
from time import sleep
parser = argparse.ArgumentParser(description="aerotech client")
parser.add_argument("--stop", help="stop/reset/restart aerotech", action="store_true")
parser.add_argument("--sanitize-speeds", help="reset speeds on axis to sane defaults", action="store_true")
parser.add_argument("--measure-standard", help="measure a standard dataset", action="store")
parser.add_argument("--omega", help="move omega to...", action="store")
parser.add_argument("--gmx", help="move gmx to...", action="store")
parser.add_argument("--gmy", help="move gmy to...", action="store")
parser.add_argument("--gmz", help="move gmz to...", action="store")
args = parser.parse_args()
abr = Abr()
if args.stop:
abr.stop()
elif args.sanitize_speeds:
abr.velo(AXIS_GMX, 50.0)
abr.velo(AXIS_GMY, 10.0)
abr.velo(AXIS_GMZ, 10.0)
abr.velo(AXIS_OMEGA, 180.0)
elif args.gmx:
abr.move_x(args.gmx)
elif args.measure_standard:
start_angle, total_range, total_time = args.measure_standard.split(",")
abr.measure_standard(start_angle, total_range, total_time)

View File

@@ -5,7 +5,7 @@ Created on Tue Jun 11 11:28:38 2024
@author: mohacsi_i
"""
from ophyd import Component, PVPositioner, EpicsSignal, EpicsSignalRO, Kind
from ophyd import Component, PVPositioner, Device, EpicsSignal, EpicsSignalRO, Kind
from ophyd.status import Status
@@ -17,14 +17,17 @@ class A3200Axis(PVPositioner):
# Basic PV positioner interface
done = Component(EpicsSignalRO, "-DONE", auto_monitor=True, kind=Kind.config)
readback = Component(EpicsSignalRO, "-RBV", auto_monitor=True, kind=Kind.hinted)
# Setpoint is one of the two...
setpoint = Component(EpicsSignal, "-SETP", kind=Kind.config)
#setpoint = Component(EpicsSignal, "-VAL", kind=Kind.config)
velo = Component(EpicsSignal, "-SETV", kind=Kind.config)
error = Component(EpicsSignalRO, "-STAT", auto_monitor=True, kind=Kind.config)
status = Component(EpicsSignalRO, "-STAT", auto_monitor=True, kind=Kind.config)
# PV to issue native relative movements on the A3200
relmove = Component(EpicsSignal, "-INCP", put_complete=True, kind=Kind.config)
# PV to home axis
home = Component(EpicsSignal, "-HOME", kind=Kind.config)
ishomed = Component(EpicsSignal, "-AS00", kind=Kind.config)
# HW status words
dshw = Component(EpicsSignalRO, "-DSHW", auto_monitor=True, kind=Kind.hinted)
@@ -38,11 +41,49 @@ class A3200Axis(PVPositioner):
_rock_count = Component(EpicsSignal, "-COUNT", put_complete=True, kind=Kind.config)
#_rock_accel = Component(EpicsSignal, "-RRATE", put_complete=True, kind=Kind.config)
def rmove(self, distance, wait=True, timeout=None, moved_cb=None) -> Status:
hlm = Component(Signal, kind=Kind.config)
llm = Component(Signal, kind=Kind.config)
vmin = Component(Signal, kind=Kind.config)
vmax = Component(Signal, kind=Kind.config)
offset = Component(EpicsSignal, "-OFF", put_complete=True, kind=Kind.config)
def __init__(self, prefix="", *, name, kind=None, read_attrs=None, configuration_attrs=None, parent=None, llm=0, hlm=0, vmin=0, vmax=0, **kwargs):
""" __init__ MUST have a full argument list"""
super().__init__(prefix=prefix, name=name, kind=kind, read_attrs=read_attrs, configuration_attrs=configuration_attrs, parent=parent, **kwargs)
self.llm.set(llm).wait()
self.hlm.set(hlm).wait()
self.vmin.set(vmin).wait()
self.vmax.set(vmax).wait()
def move(self, position, wait=True, velo=None, direct=False, **kwargs):
""" Native absolute movement on the A3200 """
if self.parent is not None:
if self.parent.mode != DIRECT_MODE:
if direct:
self.parent.set_direct_mode()
else:
raise RuntimeError("ABR is not in direct mode!")
if velo is not None:
self.velo.set(velo).wait()
return super().move(position, wait=wait, **kwargs)
def rmove(self, distance, wait=True, velo=None, timeout=None, moved_cb=None) -> Status:
""" Native relative movement on the A3200 """
# Before moving, ensure we can stop (if a stop_signal is configured).
if self.stop_signal is not None:
self.stop_signal.wait_for_connection()
if velo is not None:
self.velo.set(velo).wait()
# Issue move command
status = super().move(position=distance, timeout=timeout, moved_cb=moved_cb)
has_done = self.done is not None
@@ -79,6 +120,245 @@ class A3200Axis(PVPositioner):
return status
class A3200RasterScanner(Device):
"""Positioner wrapper for motors on the Aerotech A3200 controller. As the
IOC does not provide a motor record, this class simply wraps axes into
a standard Ophyd positioner for the BEC. It also has some additional
functionality for diagnostics."""
x_start = Component(EpicsSignal, "-ES-GRD:GMX-START", kind=Kind.config)
x_start = Component(EpicsSignal, "-ES-GRD:GMX-END", kind=Kind.config)
omega = Component(EpicsSignal, "-ES-OSC:#M-START", kind=Kind.config)
osc = Component(EpicsSignal, "-ES-GRD:ANGLE", kind=Kind.config)
celltime = Component(EpicsSignal, "-ES-GRD:CELL-TIME", kind=Kind.config)
y_start = Component(EpicsSignal, "-ES-OSC:#STY-START", kind=Kind.config)
y_end = Component(EpicsSignal, "-ES-OSC:#STY-END", kind=Kind.config)
z_start = Component(EpicsSignal, "-ES-OSC:#STZ-START", kind=Kind.config)
z_end = Component(EpicsSignal, "-ES-OSC:#STZ-END", kind=Kind.config)
columns = Component(EpicsSignal, "-ES-GRD:COLUMNS", kind=Kind.config)
rows = Component(EpicsSignal, "-ES-GRD:ROWS", kind=Kind.config)
delay = Component(EpicsSignal, "-ES-GRD:RAST-DLY", kind=Kind.config)
osc_mode = Component(EpicsSignal, "-ES-GRD:SET-MODE", kind=Kind.config)
velo = Component(EpicsSignal, "-ES-GRD:#X-VEL", kind=Kind.config)
get_ready = Component(EpicsSignal, "-ES-GRD:READY.PROC", kind=Kind.config)
grid_start = Component(EpicsSignal, "-ES-GRD:START.PROC", kind=Kind.config)
grid_next = Component(EpicsSignal, "-ES-GRD:NEXT-ROW", kind=Kind.config)
row_done = Component(EpicsSignal, "-ES-GRD:ROW-DONE", kind=Kind.config)
scan_done = Component(EpicsSignal, "-ES-GRD:SCAN-DONE", kind=Kind.config)
grid_done = Component(EpicsSignal, "-ES-GRD:DONE", kind=Kind.config)
# Also needs the command interface
_zcmd = Component(EpicsSignal, "-ES-PSO:CMD", put_complete=True, kind=Kind.omitted)
_start_command = Component(EpicsSignal, "-ES-PSO:START-TEST.PROC", put_complete=True, kind=Kind.omitted)
_stop_command = Component(EpicsSignal, "-ES-PSO:STOP-TEST.PROC", put_complete=True, kind=Kind.omitted)
def raster_setup(self, positions, columns, angle, etime):
"""configures parameters for a raster scan
positions : a list of xyz indicating the positions of each cell
columns: an integer with how many columns
angle : the oscillation angle for each row
etime : the exposure time per cell
"""
self.parent.axisModeDirect.set(1).wait()
if columns == 1:
raise RuntimeWarning("Fast scans are not available with vertical line scans.")
rows = len(positions) / columns
x1, y1, z1 = tuple(positions[0]) # first cell on first row
x2, y2, z2 = tuple(positions[1]) # second cell on first row
x4, y4, z4 = tuple(positions[columns - 1]) # last cell on first row
if rows > 1:
x3, y3, z3 = tuple(positions[columns]) # first cell on second row
ystep = y3 - y1
zstep = z3 - z1
gmy_step = math.sqrt(pow(y3 - y1, 2) + pow(z3 - z1, 2))
else:
ystep = 0.010
zstep = 0.010
gmy_step = 0.010
half_cell = abs(x2 - x1) / 2.0
start_x = x1 - half_cell
end_x = x4 + half_cell
self._raster_starting_x = start_x
self._raster_starting_y = y1
self._raster_starting_z = z1
self._raster_step_y = ystep
self._raster_step_z = zstep
self._raster_current_row = 0
self._raster_num_rows = rows
self.x_start.set(start_x).wait()
self.x_end.set(end_x).wait()
self.omega.set(self.position).wait()
self.osc.set(angle).wait()
self.celltime.set(etime).wait()
self.y_start.set(y1).wait()
self.z_start.set(z1).wait()
self.y_step.set(ystep).wait()
self.z_step.set(zstep).wait()
self.columns.set(columns).wait()
self.rows.set(rows).wait()
def raster_next_row_yz(self):
r = self._raster_current_row
self._raster_current_row = r + 1
y, z = (
r * self.step_y.value + self._raster_starting_y,
r * self.step_z.value + self._raster_starting_z,
)
if r < self._raster_num_rows:
return (y, z)
else:
return (None, None)
def next_row(self):
"""start rastering a new row"""
self.grid_next.set(1).wait()
def raster_get_ready(self):
"""
WTF is this???
"""
self.get_ready.set(1).wait()
for n in range(10):
try:
# Define wait until the busy flag goes down (excluding initial update)
timestamp_ = 0
def isReady(*args, old_value, value, timestamp, **kwargs):
nonlocal timestamp_
result = False if (timestamp_== 0) else bool(value==ABR_READY)
print(f"Old {old_value}\tNew: {value}\tResult: {result}")
timestamp_ = timestamp
return result
# Subscribe and wait for update
status = SubscriptionStatus(self.grid_done, isReady, timeout=2, settle_time=0.5)
status.wait()
except TimeoutError as e:
print(str(e), end=" ")
print(" --- trying again.")
self.get_ready.put(1, wait=True)
def raster_scan_row(self):
"""start rastering a new row, either first or following rows"""
if ABR_READY == self._grid_done.get():
self.grid_start.set(1)
else:
self.grid_next.set(1)
def is_scan_done(self):
return 1 == scan_done.get()
def wait_scan_done(self, timeout=60.0) -> SubscriptionStatus:
# Define wait until the busy flag goes down (excluding initial update)
timestamp_ = 0
def isReady(*args, old_value, value, timestamp, **kwargs):
nonlocal timestamp_
result = False if (timestamp_== 0) else bool(value==1)
print(f"Old {old_value}\tNew: {value}\tResult: {result}")
timestamp_ = timestamp
return result
# Subscribe and wait for update
status = SubscriptionStatus(self.scan_done, isReady, timeout=timeout, settle_time=0.5)
status.wait()
return status
def raster_is_row_scanning(self):
return 1 == self.scan_done.get()
def raster_wait_for_row(self, timeout=60) -> SubscriptionStatus:
# Define wait subscription
timestamp_ = 0
def isReady(*args, old_value, value, timestamp, **kwargs):
nonlocal timestamp_
result = False if (timestamp_== 0) else bool(value==1)
print(f"Old {old_value}\tNew: {value}\tResult: {result}")
timestamp_ = timestamp
return result
# Subscribe and wait for update
status = SubscriptionStatus(self.row_done, isReady, timeout=timeout, settle_time=0.5)
status.wait()
return status
class A3200Oscillator(Device):
"""Positioner wrapper for motors on the Aerotech A3200 controller. As the
IOC does not provide a motor record, this class simply wraps axes into
a standard Ophyd positioner for the BEC. It also has some additional
functionality for diagnostics."""
ostart_pos = Component(EpicsSignal, "-ES-OSC:START-POS", put_complete=True, kind=Kind.config)
orange = Component(EpicsSignal, "-ES-OSC:RANGE", put_complete=True, kind=Kind.config)
etime = Component(EpicsSignal, "-ES-OSC:ETIME", put_complete=True, kind=Kind.config)
get_ready = Component(EpicsSignal, "-ES-OSC:READY.PROC", put_complete=True, kind=Kind.config)
taskStart = Component(EpicsSignal, "-ES-OSC:START", put_complete=True, kind=Kind.config)
phase = Component(EpicsSignal, "-ES-OSC:DONE", auto_monitor=True, kind=Kind.config)
@property
def is_done(self):
return ABR_DONE == self.phase.get()
@property
def is_ready(self):
return ABR_READY == self.phase.get()
@property
def is_busy(self):
return ABR_BUSY == self.phase.get()
def wait_status(self, target, timeout=60.0) -> SubscriptionStatus:
"""Wait for the Aertotech IOC to reach the desired `status`.
PARAMETERS
`status` can be any the three ABR_DONE, ABR_READY or ABR_BUSY.
RETURNS
SubscriptionStatus : A waitable status subscribed on 'phase' updates.
"""
# Define wait until the busy flag goes down (excluding initial update)
def inStatus(*args, old_value, value, timestamp, **kwargs):
result = bool(value==target)
print(f"Old {old_value}\tNew: {value}\tResult: {result}")
return result
# Subscribe and wait for update
status = SubscriptionStatus(self.phase, inStatus, timeout=timeout, settle_time=0.5)
return status
# Automatically start an axis if directly invoked
if __name__ == "__main__":
omega = A3200Axis(prefix="X06DA-ES-DF1:OMEGA", name="omega")