This commit is contained in:
gac-x06da
2025-01-24 15:13:24 +01:00
parent 24302c244d
commit 015bf2ee3b
5 changed files with 266 additions and 192 deletions
+75 -72
View File
@@ -78,11 +78,14 @@ Examples
abr.stop() # this function only returns after the STATUS is back to OK abr.stop() # this function only returns after the STATUS is back to OK
""" """
import time import time
from ophyd import Component, EpicsSignal, EpicsSignalRO, Kind from ophyd import Component, EpicsSignal, EpicsSignalRO, Kind
from ophyd.status import SubscriptionStatus from ophyd.status import SubscriptionStatus
from ophyd_devices.interfaces.base_classes.psi_detector_base import PSIDetectorBase as PsiDeviceBase from ophyd_devices.interfaces.base_classes.psi_detector_base import PSIDetectorBase as PsiDeviceBase
from ophyd_devices.interfaces.base_classes.psi_detector_base import CustomDetectorMixin as CustomDeviceMixin from ophyd_devices.interfaces.base_classes.psi_detector_base import (
CustomDetectorMixin as CustomDeviceMixin,
)
try: try:
from .A3200enums import AbrCmd, AbrMode from .A3200enums import AbrCmd, AbrMode
@@ -92,19 +95,17 @@ except ImportError:
try: try:
from bec_lib import bec_logger from bec_lib import bec_logger
logger = bec_logger.logger logger = bec_logger.logger
except ModuleNotFoundError: except ModuleNotFoundError:
import logging import logging
logger = logging.getLogger("A3200") logger = logging.getLogger("A3200")
# pylint: disable=logging-fstring-interpolation # pylint: disable=logging-fstring-interpolation
class AerotechAbrMixin(CustomDeviceMixin): class AerotechAbrMixin(CustomDeviceMixin):
""" Configuration class for the Aerotech A3200 controller for the ABR stage""" """Configuration class for the Aerotech A3200 controller for the ABR stage"""
def on_stage(self): def on_stage(self):
""" """
@@ -116,8 +117,8 @@ class AerotechAbrMixin(CustomDeviceMixin):
d = {} d = {}
if self.parent.scaninfo.scan_type in ("measure", "measurement", "fly"): if self.parent.scaninfo.scan_type in ("measure", "measurement", "fly"):
scanargs = self.parent.scaninfo.scan_msg.info['kwargs'] scanargs = self.parent.scaninfo.scan_msg.info["kwargs"]
scanname = self.parent.scaninfo.scan_msg.info['scan_name'] scanname = self.parent.scaninfo.scan_msg.info["scan_name"]
if scanname in ("standardscan"): if scanname in ("standardscan"):
scan_start = scanargs["start"] scan_start = scanargs["start"]
@@ -186,26 +187,23 @@ class AerotechAbrMixin(CustomDeviceMixin):
d["var_9"] = 0 d["var_9"] = 0
# Reconfigure if got a valid scan config # Reconfigure if got a valid scan config
if len(d)>0: if len(d) > 0:
self.parent.configure(d) self.parent.configure(d)
# Stage the parent # Stage the parent
self.parent.bluestage() self.parent.bluestage()
def on_kickoff(self): def on_kickoff(self):
""" Kick off parent """ """Kick off parent"""
self.parent.bluekickoff() self.parent.bluekickoff()
def on_unstage(self): def on_unstage(self):
""" Unstage the ABR controller""" """Unstage the ABR controller"""
self.parent.blueunstage() self.parent.blueunstage()
class AerotechAbrStage(PsiDeviceBase): class AerotechAbrStage(PsiDeviceBase):
""" Standard PX stage on A3200 controller """Standard PX stage on A3200 controller
This is the wrapper class for the standard rotation stage layout for the PX This is the wrapper class for the standard rotation stage layout for the PX
beamlines at SLS. It wraps the main rotation axis OMEGA (Aerotech ABR)and beamlines at SLS. It wraps the main rotation axis OMEGA (Aerotech ABR)and
@@ -214,23 +212,20 @@ class AerotechAbrStage(PsiDeviceBase):
is running as an AeroBasic program on the controller and we communicate to is running as an AeroBasic program on the controller and we communicate to
it via 10+1 global variables. it via 10+1 global variables.
""" """
custom_prepare_cls = AerotechAbrMixin
USER_ACCESS = ['reset', 'kickoff', 'complete']
taskStop = Component( custom_prepare_cls = AerotechAbrMixin
EpicsSignal, "-AERO:TSK-STOP", put_complete=True, kind=Kind.omitted) USER_ACCESS = ["reset", "kickoff", "complete"]
status = Component(
EpicsSignal, "-AERO:STAT", put_complete=True, kind=Kind.omitted) taskStop = Component(EpicsSignal, "-AERO:TSK-STOP", put_complete=True, kind=Kind.omitted)
clear = Component( status = Component(EpicsSignal, "-AERO:STAT", put_complete=True, kind=Kind.omitted)
EpicsSignal, "-AERO:CTRL-CLFT", put_complete=True, kind=Kind.omitted) clear = Component(EpicsSignal, "-AERO:CTRL-CLFT", put_complete=True, kind=Kind.omitted)
# Enable/disable motor movement via the IOC (i.e. make it task-only) # Enable/disable motor movement via the IOC (i.e. make it task-only)
axisModeLocked = Component( axisModeLocked = Component(EpicsSignal, "-DF1:LOCK", put_complete=True, kind=Kind.omitted)
EpicsSignal, "-DF1:LOCK", put_complete=True, kind=Kind.omitted)
axisModeDirect = Component( axisModeDirect = Component(
EpicsSignal, "-DF1:MODE-DIRECT", put_complete=True, kind=Kind.omitted) EpicsSignal, "-DF1:MODE-DIRECT", put_complete=True, kind=Kind.omitted
axisAxesMode = Component( )
EpicsSignal, "-DF1:AXES-MODE", put_complete=True, kind=Kind.omitted) axisAxesMode = Component(EpicsSignal, "-DF1:AXES-MODE", put_complete=True, kind=Kind.omitted)
# Shutter box is missing readback so the -GET signal is installed on the VME # Shutter box is missing readback so the -GET signal is installed on the VME
# _shutter = Component( # _shutter = Component(
@@ -244,12 +239,13 @@ class AerotechAbrStage(PsiDeviceBase):
gmz_done = Component(EpicsSignalRO, "-DF1:GMZ-DONE", kind=Kind.normal) gmz_done = Component(EpicsSignalRO, "-DF1:GMZ-DONE", kind=Kind.normal)
# For some reason the task interface is called PSO... # For some reason the task interface is called PSO...
scan_command = Component( scan_command = Component(EpicsSignal, "-PSO:CMD", put_complete=True, kind=Kind.omitted)
EpicsSignal, "-PSO:CMD", put_complete=True, kind=Kind.omitted)
start_command = Component( start_command = Component(
EpicsSignal, "-PSO:START-TEST.PROC", put_complete=True, kind=Kind.omitted) EpicsSignal, "-PSO:START-TEST.PROC", put_complete=True, kind=Kind.omitted
)
stop_command = Component( stop_command = Component(
EpicsSignal, "-PSO:STOP-TEST.PROC", put_complete=True, kind=Kind.omitted) EpicsSignal, "-PSO:STOP-TEST.PROC", put_complete=True, kind=Kind.omitted
)
# Global variables to controll AeroBasic scripts # Global variables to controll AeroBasic scripts
_var_1 = Component(EpicsSignal, "-PSO:VAR-1", put_complete=True, kind=Kind.omitted) _var_1 = Component(EpicsSignal, "-PSO:VAR-1", put_complete=True, kind=Kind.omitted)
@@ -274,7 +270,7 @@ class AerotechAbrStage(PsiDeviceBase):
raster_num_rows = Component(EpicsSignal, "-GRD:ROW-DONE", kind=Kind.config) raster_num_rows = Component(EpicsSignal, "-GRD:ROW-DONE", kind=Kind.config)
def set_axis_mode(self, mode: str, settle_time=0.1) -> None: def set_axis_mode(self, mode: str, settle_time=0.1) -> None:
""" Set axis mode to direct/measurement mode. """Set axis mode to direct/measurement mode.
Measurement ensures that the scrips run undisturbed by blocking axis Measurement ensures that the scrips run undisturbed by blocking axis
motion commands from the IOC (i.e. internal only). motion commands from the IOC (i.e. internal only).
@@ -284,13 +280,13 @@ class AerotechAbrStage(PsiDeviceBase):
mode : str mode : str
Valid values are 'direct' and 'measuring'. Valid values are 'direct' and 'measuring'.
""" """
if mode=="direct": if mode == "direct":
self.axisModeDirect.set(37, settle_time=settle_time).wait() self.axisModeDirect.set(37, settle_time=settle_time).wait()
if mode=="measuring": if mode == "measuring":
self.axisAxesMode.set(AbrMode.MEASURING, settle_time=settle_time).wait() self.axisAxesMode.set(AbrMode.MEASURING, settle_time=settle_time).wait()
def configure(self, d: dict) -> tuple: def configure(self, d: dict) -> tuple:
"""" Configure the exposure scripts """ " Configure the exposure scripts
Script execution at the PX beamlines is based on scripts that are always Script execution at the PX beamlines is based on scripts that are always
running on the controller that execute commands when commanded by running on the controller that execute commands when commanded by
@@ -317,54 +313,56 @@ class AerotechAbrStage(PsiDeviceBase):
old = self.read_configuration() old = self.read_configuration()
# ToDo: Check if idle before reconfiguring # ToDo: Check if idle before reconfiguring
self.scan_command.set(d['scan_command']).wait() self.scan_command.set(d["scan_command"]).wait()
# Set the corresponding global variables # Set the corresponding global variables
if "var_1" in d and d['var_1'] is not None: if "var_1" in d and d["var_1"] is not None:
self._var_1.set(d['var_1']).wait() self._var_1.set(d["var_1"]).wait()
if "var_2" in d and d['var_2'] is not None: if "var_2" in d and d["var_2"] is not None:
self._var_2.set(d['var_2']).wait() self._var_2.set(d["var_2"]).wait()
if "var_3" in d and d['var_3'] is not None: if "var_3" in d and d["var_3"] is not None:
self._var_3.set(d['var_3']).wait() self._var_3.set(d["var_3"]).wait()
if "var_4" in d and d['var_4'] is not None: if "var_4" in d and d["var_4"] is not None:
self._var_4.set(d['var_4']).wait() self._var_4.set(d["var_4"]).wait()
if "var_5" in d and d['var_5'] is not None: if "var_5" in d and d["var_5"] is not None:
self._var_5.set(d['var_5']).wait() self._var_5.set(d["var_5"]).wait()
if "var_6" in d and d['var_6'] is not None: if "var_6" in d and d["var_6"] is not None:
self._var_6.set(d['var_6']).wait() self._var_6.set(d["var_6"]).wait()
if "var_7" in d and d['var_7'] is not None: if "var_7" in d and d["var_7"] is not None:
self._var_7.set(d['var_7']).wait() self._var_7.set(d["var_7"]).wait()
if "var_8" in d and d['var_8'] is not None: if "var_8" in d and d["var_8"] is not None:
self._var_8.set(d['var_8']).wait() self._var_8.set(d["var_8"]).wait()
if "var_9" in d and d['var_9'] is not None: if "var_9" in d and d["var_9"] is not None:
self._var_9.set(d['var_9']).wait() self._var_9.set(d["var_9"]).wait()
if "var_10" in d and d['var_10'] is not None: if "var_10" in d and d["var_10"] is not None:
self._var_10.set(d['var_10']).wait() self._var_10.set(d["var_10"]).wait()
new = self.read_configuration() new = self.read_configuration()
return old, new return old, new
def bluestage(self): def bluestage(self):
""" Bluesky-style stage """Bluesky-style stage
Since configuration synchronization is not guaranteed, this does Since configuration synchronization is not guaranteed, this does
nothing. The script launched by kickoff(). nothing. The script launched by kickoff().
""" """
pass pass
def bluekickoff(self, timeout=1) -> SubscriptionStatus: def bluekickoff(self, timeout=1) -> SubscriptionStatus:
""" Kick off the set program """ """Kick off the set program"""
self.start_command.set(1).wait() self.start_command.set(1).wait()
# Define wait until the busy flag goes high # Define wait until the busy flag goes high
def is_busy(*args, value, **kwargs): def is_busy(*args, value, **kwargs):
return bool(value==0) return bool(value == 0)
# Subscribe and wait for update # Subscribe and wait for update
status = SubscriptionStatus(self.raster_scan_done, is_busy, timeout=timeout, settle_time=0.1) status = SubscriptionStatus(
self.raster_scan_done, is_busy, timeout=timeout, settle_time=0.1
)
return status return status
def blueunstage(self, settle_time=0.1): def blueunstage(self, settle_time=0.1):
""" Stops current script and releases the axes""" """Stops current script and releases the axes"""
# Disarm commands # Disarm commands
self.scan_command.set(AbrCmd.NONE, settle_time=settle_time).wait() self.scan_command.set(AbrCmd.NONE, settle_time=settle_time).wait()
self.stop_command.set(1).wait() self.stop_command.set(1).wait()
@@ -372,21 +370,24 @@ class AerotechAbrStage(PsiDeviceBase):
self.set_axis_mode("direct", settle_time=settle_time) self.set_axis_mode("direct", settle_time=settle_time)
def complete(self, timeout=None) -> SubscriptionStatus: def complete(self, timeout=None) -> SubscriptionStatus:
""" Waits for task execution """Waits for task execution
NOTE: Original complete was raster scanner complete... NOTE: Original complete was raster scanner complete...
""" """
# Define wait until the busy flag goes down (excluding initial update) # Define wait until the busy flag goes down (excluding initial update)
def is_idle(*args, value, **kwargs): def is_idle(*args, value, **kwargs):
return bool(value==1) return bool(value == 1)
# Subscribe and wait for update # Subscribe and wait for update
# status = SubscriptionStatus(self.task1, is_idle, timeout=timeout, settle_time=0.5) # status = SubscriptionStatus(self.task1, is_idle, timeout=timeout, settle_time=0.5)
status = SubscriptionStatus(self.raster_scan_done, is_idle, timeout=timeout, settle_time=0.5) status = SubscriptionStatus(
self.raster_scan_done, is_idle, timeout=timeout, settle_time=0.5
)
return status return status
def reset(self, settle_time=0.1, wait_after_reload=1) -> None: def reset(self, settle_time=0.1, wait_after_reload=1) -> None:
""" Resets the Aerotech controller state """Resets the Aerotech controller state
Attempts to reset the currently running measurement task on the A3200 Attempts to reset the currently running measurement task on the A3200
by stopping current motions, reloading aerobasic programs and going by stopping current motions, reloading aerobasic programs and going
@@ -405,8 +406,7 @@ class AerotechAbrStage(PsiDeviceBase):
self.set_axis_mode("direct", settle_time=settle_time) self.set_axis_mode("direct", settle_time=settle_time)
def stop(self, settle_time=1.0) -> None: def stop(self, settle_time=1.0) -> None:
""" Stops current motions """Stops current motions"""
"""
# Disarm commands # Disarm commands
self.scan_command.set(AbrCmd.NONE, settle_time=settle_time).wait() self.scan_command.set(AbrCmd.NONE, settle_time=settle_time).wait()
self.stop_command.set(1).wait() self.stop_command.set(1).wait()
@@ -466,7 +466,7 @@ class AerotechAbrStage(PsiDeviceBase):
# return state == self._shutter.get() # return state == self._shutter.get()
def wait_for_movements(self, timeout=60.0): def wait_for_movements(self, timeout=60.0):
""" Waits for all motor movements""" """Waits for all motor movements"""
t_start = time.time() t_start = time.time()
t_elapsed = 0 t_elapsed = 0
@@ -480,7 +480,10 @@ class AerotechAbrStage(PsiDeviceBase):
"""Chechs if all axes are DONE""" """Chechs if all axes are DONE"""
return not ( return not (
self.omega_done.get() self.omega_done.get()
and self.gmx_done.get() and self.gmy_done.get() and self.gmz_done.get() ) and self.gmx_done.get()
and self.gmy_done.get()
and self.gmz_done.get()
)
# def start_exposure(self): # def start_exposure(self):
# """Starts the previously configured exposure.""" # """Starts the previously configured exposure."""
+21
View File
@@ -6,50 +6,67 @@ Enumerations for the MX specific Aerotech A3200 stage.
""" """
# pylint: disable=too-few-public-methods # pylint: disable=too-few-public-methods
class AbrStatus: class AbrStatus:
"""ABR measurement task status""" """ABR measurement task status"""
DONE = 0 DONE = 0
READY = 1 READY = 1
BUSY = 2 BUSY = 2
ABR_DONE = 0 ABR_DONE = 0
ABR_READY = 1 ABR_READY = 1
ABR_BUSY = 2 ABR_BUSY = 2
class AbrGridStatus: class AbrGridStatus:
"""ABR grid scan status""" """ABR grid scan status"""
BUSY = 0 BUSY = 0
DONE = 1 DONE = 1
GRID_SCAN_BUSY = 0 GRID_SCAN_BUSY = 0
GRID_SCAN_DONE = 1 GRID_SCAN_DONE = 1
class AbrMode: class AbrMode:
"""ABR mode status""" """ABR mode status"""
DIRECT = 0 DIRECT = 0
MEASURING = 1 MEASURING = 1
DIRECT_MODE = 0 DIRECT_MODE = 0
MEASURING_MODE = 1 MEASURING_MODE = 1
class AbrShutterStatus: class AbrShutterStatus:
"""ABR shutter status""" """ABR shutter status"""
CLOSE = 0 CLOSE = 0
OPEN = 1 OPEN = 1
SHUTTER_CLOSE = 0 SHUTTER_CLOSE = 0
SHUTTER_OPEN = 1 SHUTTER_OPEN = 1
class AbrGridPeriod: class AbrGridPeriod:
"""ABR grid period""" """ABR grid period"""
FULL = 0 FULL = 0
HALF = 1 HALF = 1
FULL_PERIOD = 0 FULL_PERIOD = 0
HALF_PERIOD = 1 HALF_PERIOD = 1
class AbrCmd: class AbrCmd:
"""ABR command table""" """ABR command table"""
NONE = 0 NONE = 0
RASTER_SCAN_SIMPLE = 1 RASTER_SCAN_SIMPLE = 1
MEASURE_STANDARD = 2 MEASURE_STANDARD = 2
@@ -73,6 +90,7 @@ class AbrCmd:
SCAN_SASTT_V2 = 20 SCAN_SASTT_V2 = 20
SCAN_SASTT_V3 = 21 SCAN_SASTT_V3 = 21
CMD_NONE = 0 CMD_NONE = 0
CMD_RASTER_SCAN_SIMPLE = 1 CMD_RASTER_SCAN_SIMPLE = 1
CMD_MEASURE_STANDARD = 2 CMD_MEASURE_STANDARD = 2
@@ -96,8 +114,10 @@ CMD_SCAN_SASTT = 19
CMD_SCAN_SASTT_V2 = 20 CMD_SCAN_SASTT_V2 = 20
CMD_SCAN_SASTT_V3 = 21 CMD_SCAN_SASTT_V3 = 21
class AbrAxis: class AbrAxis:
"""ABR axis index""" """ABR axis index"""
OMEGA = 1 OMEGA = 1
GMX = 2 GMX = 2
GMY = 3 GMY = 3
@@ -105,6 +125,7 @@ class AbrAxis:
STY = 5 STY = 5
STZ = 6 STZ = 6
AXIS_OMEGA = 1 AXIS_OMEGA = 1
AXIS_GMX = 2 AXIS_GMX = 2
AXIS_GMY = 3 AXIS_GMY = 3
+89 -55
View File
@@ -6,20 +6,27 @@ Created on Tue Jun 11 11:28:38 2024
""" """
import types import types
import math import math
from ophyd import (Component, PVPositioner, Device, Signal, EpicsSignal, EpicsSignalRO, Kind, from ophyd import (
PositionerBase) Component,
PVPositioner,
Device,
Signal,
EpicsSignal,
EpicsSignalRO,
Kind,
PositionerBase,
)
from ophyd.status import Status, MoveStatus, SubscriptionStatus, DeviceStatus from ophyd.status import Status, MoveStatus, SubscriptionStatus, DeviceStatus
from bec_lib import bec_logger from bec_lib import bec_logger
logger = bec_logger.logger logger = bec_logger.logger
from .A3200enums import * from .A3200enums import *
ABR_DONE = 0 ABR_DONE = 0
ABR_READY = 1 ABR_READY = 1
ABR_BUSY = 2 ABR_BUSY = 2
@@ -32,7 +39,7 @@ MEASURING_MODE = 1
class A3200Axis(PVPositioner): class A3200Axis(PVPositioner):
""" Positioner wrapper for A3200 axes """Positioner wrapper for A3200 axes
Positioner wrapper for motors on the Aerotech A3200 controller. As the IOC Positioner wrapper for motors on the Aerotech A3200 controller. As the IOC
@@ -56,19 +63,22 @@ class A3200Axis(PVPositioner):
base_pv : str (situational) base_pv : str (situational)
IOC PV name root, i.e. X06DA-ES if standalone class. IOC PV name root, i.e. X06DA-ES if standalone class.
""" """
USER_ACCESS = ['omove']
USER_ACCESS = ["omove"]
abr_mode_direct = Component( abr_mode_direct = Component(
EpicsSignal, "-DF1:MODE-DIRECT", put_complete=True, kind=Kind.omitted) EpicsSignal, "-DF1:MODE-DIRECT", put_complete=True, kind=Kind.omitted
)
abr_mode = Component( abr_mode = Component(
EpicsSignal, "-DF1:AXES-MODE", auto_monitor=True, put_complete=True, kind=Kind.omitted) EpicsSignal, "-DF1:AXES-MODE", auto_monitor=True, put_complete=True, kind=Kind.omitted
)
# Basic PV positioner interface # Basic PV positioner interface
done = Component(EpicsSignalRO, "-DONE", auto_monitor=True, kind=Kind.config) done = Component(EpicsSignalRO, "-DONE", auto_monitor=True, kind=Kind.config)
readback = Component(EpicsSignalRO, "-RBV", auto_monitor=True, kind=Kind.hinted) readback = Component(EpicsSignalRO, "-RBV", auto_monitor=True, kind=Kind.hinted)
# Setpoint is one of the two... # Setpoint is one of the two...
setpoint = Component(EpicsSignal, "-SETP", kind=Kind.config) setpoint = Component(EpicsSignal, "-SETP", kind=Kind.config)
#setpoint = Component(EpicsSignal, "-VAL", kind=Kind.config) # setpoint = Component(EpicsSignal, "-VAL", kind=Kind.config)
velocity = Component(EpicsSignal, "-SETV", kind=Kind.config) velocity = Component(EpicsSignal, "-SETV", kind=Kind.config)
status = Component(EpicsSignalRO, "-STAT", auto_monitor=True, kind=Kind.config) status = Component(EpicsSignalRO, "-STAT", auto_monitor=True, kind=Kind.config)
@@ -88,7 +98,7 @@ class A3200Axis(PVPositioner):
_rock_dist = Component(EpicsSignal, "-RINCP", put_complete=True, kind=Kind.config) _rock_dist = Component(EpicsSignal, "-RINCP", put_complete=True, kind=Kind.config)
_rock_velo = Component(EpicsSignal, "-RSETV", put_complete=True, kind=Kind.config) _rock_velo = Component(EpicsSignal, "-RSETV", put_complete=True, kind=Kind.config)
_rock_count = Component(EpicsSignal, "-COUNT", put_complete=True, kind=Kind.config) _rock_count = Component(EpicsSignal, "-COUNT", put_complete=True, kind=Kind.config)
#_rock_accel = Component(EpicsSignal, "-RRATE", put_complete=True, kind=Kind.config) # _rock_accel = Component(EpicsSignal, "-RRATE", put_complete=True, kind=Kind.config)
hlm = Component(Signal, kind=Kind.config) hlm = Component(Signal, kind=Kind.config)
llm = Component(Signal, kind=Kind.config) llm = Component(Signal, kind=Kind.config)
@@ -96,37 +106,69 @@ class A3200Axis(PVPositioner):
vmax = Component(Signal, kind=Kind.config) vmax = Component(Signal, kind=Kind.config)
offset = Component(EpicsSignal, "-OFF", put_complete=True, kind=Kind.config) offset = Component(EpicsSignal, "-OFF", put_complete=True, kind=Kind.config)
def __init__(self, prefix="", *, name, base_pv="", kind=None, read_attrs=None, def __init__(
configuration_attrs=None, parent=None, llm=0, hlm=0, vmin=0, vmax=0, **kwargs): self,
""" __init__ MUST have a full argument list""" prefix="",
*,
name,
base_pv="",
kind=None,
read_attrs=None,
configuration_attrs=None,
parent=None,
llm=0,
hlm=0,
vmin=0,
vmax=0,
**kwargs,
):
"""__init__ MUST have a full argument list"""
# Patching the parent's PVs into the axis class to check for direct/locked mode # Patching the parent's PVs into the axis class to check for direct/locked mode
if parent is None: if parent is None:
def maybe_add_prefix(self, instance, kw, suffix): def maybe_add_prefix(self, instance, kw, suffix):
# Patched not to enforce parent prefix when no parent # Patched not to enforce parent prefix when no parent
if kw in self.add_prefix: if kw in self.add_prefix:
return suffix return suffix
return suffix return suffix
self.__class__.__dict__["abr_mode"].maybe_add_prefix = types.MethodType( self.__class__.__dict__["abr_mode"].maybe_add_prefix = types.MethodType(
maybe_add_prefix, maybe_add_prefix, self.__class__.__dict__["abr_mode"]
self.__class__.__dict__["abr_mode"]) )
self.__class__.__dict__["abr_mode_direct"].maybe_add_prefix = types.MethodType( self.__class__.__dict__["abr_mode_direct"].maybe_add_prefix = types.MethodType(
maybe_add_prefix, maybe_add_prefix, self.__class__.__dict__["abr_mode_direct"]
self.__class__.__dict__["abr_mode_direct"]) )
logger.info(self.__class__.__dict__["abr_mode"].kwargs) logger.info(self.__class__.__dict__["abr_mode"].kwargs)
self.__class__.__dict__["abr_mode"].suffix = base_pv + "-DF1:AXES-MODE" self.__class__.__dict__["abr_mode"].suffix = base_pv + "-DF1:AXES-MODE"
self.__class__.__dict__["abr_mode_direct"].suffix = base_pv + "-DF1:MODE-DIRECT" self.__class__.__dict__["abr_mode_direct"].suffix = base_pv + "-DF1:MODE-DIRECT"
super().__init__(prefix=prefix, name=name, kind=kind, read_attrs=read_attrs, super().__init__(
configuration_attrs=configuration_attrs, parent=parent, **kwargs) prefix=prefix,
name=name,
kind=kind,
read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
parent=parent,
**kwargs,
)
self.llm.set(llm).wait() self.llm.set(llm).wait()
self.hlm.set(hlm).wait() self.hlm.set(hlm).wait()
self.vmin.set(vmin).wait() self.vmin.set(vmin).wait()
self.vmax.set(vmax).wait() self.vmax.set(vmax).wait()
def omove(self, position, wait=True, timeout=None, moved_cb=None, velocity=None, relative=False, def omove(
direct=False, **kwargs) -> MoveStatus: self,
""" Native absolute/relative movement on the A3200 """ position,
wait=True,
timeout=None,
moved_cb=None,
velocity=None,
relative=False,
direct=False,
**kwargs,
) -> MoveStatus:
"""Native absolute/relative movement on the A3200"""
# Check if we're in direct movement mode # Check if we're in direct movement mode
if self.abr_mode.value not in (DIRECT_MODE, "DIRECT"): if self.abr_mode.value not in (DIRECT_MODE, "DIRECT"):
@@ -169,19 +211,18 @@ class A3200Axis(PVPositioner):
raise raise
return status return status
def move(self, position, wait=True, timeout=None, moved_cb=None, **kwargs) -> MoveStatus: def move(self, position, wait=True, timeout=None, moved_cb=None, **kwargs) -> MoveStatus:
""" Exposes the ophyd move command through BEC abstraction""" """Exposes the ophyd move command through BEC abstraction"""
return self.omove(position, wait=wait, timeout=timeout, moved_cb=moved_cb, **kwargs) return self.omove(position, wait=wait, timeout=timeout, moved_cb=moved_cb, **kwargs)
def rock(self, distance, counts: int, velocity=None, wait=True) -> Status: def rock(self, distance, counts: int, velocity=None, wait=True) -> Status:
""" Repeated single axis zigzag scan I guess PSO should be configured for this""" """Repeated single axis zigzag scan I guess PSO should be configured for this"""
self._rock_dist.put(distance) self._rock_dist.put(distance)
self._rock_count.put(counts) self._rock_count.put(counts)
if velocity is not None: if velocity is not None:
self._rock_velo.put(velocity) self._rock_velo.put(velocity)
#if acceleration is not None: # if acceleration is not None:
# self._rock_accel.put(acceleration) # self._rock_accel.put(acceleration)
self._rock.put(1) self._rock.put(1)
@@ -190,7 +231,6 @@ class A3200Axis(PVPositioner):
status.wait() status.wait()
return status return status
# def is_omega_ok(self): # def is_omega_ok(self):
# """Checks omega axis status""" # """Checks omega axis status"""
# return 0 == self.self.omega.status.get() # return 0 == self.self.omega.status.get()
@@ -216,21 +256,11 @@ class A3200Axis(PVPositioner):
# time.sleep(0.2) # time.sleep(0.2)
class A3200RasterScanner(Device): class A3200RasterScanner(Device):
"""Positioner wrapper for motors on the Aerotech A3200 controller. As the """Positioner wrapper for motors on the Aerotech A3200 controller. As the
IOC does not provide a motor record, this class simply wraps axes into IOC does not provide a motor record, this class simply wraps axes into
a standard Ophyd positioner for the BEC. It also has some additional a standard Ophyd positioner for the BEC. It also has some additional
functionality for diagnostics.""" functionality for diagnostics."""
x_start = Component(EpicsSignal, "-GRD:GMX-START", kind=Kind.config) x_start = Component(EpicsSignal, "-GRD:GMX-START", kind=Kind.config)
x_start = Component(EpicsSignal, "-GRD:GMX-END", kind=Kind.config) x_start = Component(EpicsSignal, "-GRD:GMX-END", kind=Kind.config)
@@ -259,10 +289,11 @@ class A3200RasterScanner(Device):
# Also needs the command interface # Also needs the command interface
_zcmd = Component(EpicsSignal, "-PSO:CMD", put_complete=True, kind=Kind.omitted) _zcmd = Component(EpicsSignal, "-PSO:CMD", put_complete=True, kind=Kind.omitted)
_start_command = Component( _start_command = Component(
EpicsSignal, "-PSO:START-TEST.PROC", put_complete=True, kind=Kind.omitted) EpicsSignal, "-PSO:START-TEST.PROC", put_complete=True, kind=Kind.omitted
)
_stop_command = Component( _stop_command = Component(
EpicsSignal, "-PSO:STOP-TEST.PROC", put_complete=True, kind=Kind.omitted) EpicsSignal, "-PSO:STOP-TEST.PROC", put_complete=True, kind=Kind.omitted
)
def raster_setup(self, positions, columns, angle, etime): def raster_setup(self, positions, columns, angle, etime):
"""configures parameters for a raster scan """configures parameters for a raster scan
@@ -334,7 +365,7 @@ class A3200RasterScanner(Device):
def raster_get_ready(self): def raster_get_ready(self):
""" """
WTF is this??? WTF is this???
""" """
self.get_ready.set(1).wait() self.get_ready.set(1).wait()
@@ -342,9 +373,10 @@ class A3200RasterScanner(Device):
try: try:
# Define wait until the busy flag goes down (excluding initial update) # Define wait until the busy flag goes down (excluding initial update)
timestamp_ = 0 timestamp_ = 0
def is_ready(*args, old_value, value, timestamp, **kwargs): def is_ready(*args, old_value, value, timestamp, **kwargs):
nonlocal timestamp_ nonlocal timestamp_
result = False if (timestamp_== 0) else bool(value==ABR_READY) result = False if (timestamp_ == 0) else bool(value == ABR_READY)
print(f"Old {old_value}\tNew: {value}\tResult: {result}") print(f"Old {old_value}\tNew: {value}\tResult: {result}")
timestamp_ = timestamp timestamp_ = timestamp
return result return result
@@ -370,9 +402,10 @@ class A3200RasterScanner(Device):
def wait_scan_done(self, timeout=60.0) -> SubscriptionStatus: def wait_scan_done(self, timeout=60.0) -> SubscriptionStatus:
# Define wait until the busy flag goes down (excluding initial update) # Define wait until the busy flag goes down (excluding initial update)
timestamp_ = 0 timestamp_ = 0
def is_ready(*args, old_value, value, timestamp, **kwargs): def is_ready(*args, old_value, value, timestamp, **kwargs):
nonlocal timestamp_ nonlocal timestamp_
result = False if (timestamp_== 0) else bool(value==1) result = False if (timestamp_ == 0) else bool(value == 1)
print(f"Old {old_value}\tNew: {value}\tResult: {result}") print(f"Old {old_value}\tNew: {value}\tResult: {result}")
timestamp_ = timestamp timestamp_ = timestamp
return result return result
@@ -389,9 +422,10 @@ class A3200RasterScanner(Device):
def raster_wait_for_row(self, timeout=60) -> SubscriptionStatus: def raster_wait_for_row(self, timeout=60) -> SubscriptionStatus:
# Define wait subscription # Define wait subscription
timestamp_ = 0 timestamp_ = 0
def is_ready(*args, old_value, value, timestamp, **kwargs): def is_ready(*args, old_value, value, timestamp, **kwargs):
nonlocal timestamp_ nonlocal timestamp_
result = False if (timestamp_== 0) else bool(value==1) result = False if (timestamp_ == 0) else bool(value == 1)
print(f"Old {old_value}\tNew: {value}\tResult: {result}") print(f"Old {old_value}\tNew: {value}\tResult: {result}")
timestamp_ = timestamp timestamp_ = timestamp
return result return result
@@ -403,16 +437,17 @@ class A3200RasterScanner(Device):
return status return status
def complete(self, timeout=None) -> DeviceStatus: def complete(self, timeout=None) -> DeviceStatus:
""" Wait for the grid scanner to finish """Wait for the grid scanner to finish
TODO: Probably redundant with task status wait? TODO: Probably redundant with task status wait?
""" """
if timeout is not None: if timeout is not None:
# Define wait until the busy flag goes down (excluding initial update) # Define wait until the busy flag goes down (excluding initial update)
timestamp_ = 0 timestamp_ = 0
def is_ready(*args, old_value, value, timestamp, **kwargs): def is_ready(*args, old_value, value, timestamp, **kwargs):
nonlocal timestamp_ nonlocal timestamp_
result = False if (timestamp_== 0) else bool(value==GRID_SCAN_DONE) result = False if (timestamp_ == 0) else bool(value == GRID_SCAN_DONE)
print(f"Old {old_value}\tNew: {value}\tResult: {result}") print(f"Old {old_value}\tNew: {value}\tResult: {result}")
timestamp_ = timestamp timestamp_ = timestamp
return result return result
@@ -425,11 +460,10 @@ class A3200RasterScanner(Device):
return status return status
class A3200Oscillator(Device): class A3200Oscillator(Device):
"""No clue what this does, seems to be redundant with the task based grid scanners... """No clue what this does, seems to be redundant with the task based grid scanners..."""
"""
ostart_pos = Component(EpicsSignal, "-OSC:START-POS", put_complete=True, kind=Kind.config) ostart_pos = Component(EpicsSignal, "-OSC:START-POS", put_complete=True, kind=Kind.config)
orange = Component(EpicsSignal, "-OSC:RANGE", put_complete=True, kind=Kind.config) orange = Component(EpicsSignal, "-OSC:RANGE", put_complete=True, kind=Kind.config)
etime = Component(EpicsSignal, "-OSC:ETIME", put_complete=True, kind=Kind.config) etime = Component(EpicsSignal, "-OSC:ETIME", put_complete=True, kind=Kind.config)
get_ready = Component(EpicsSignal, "-OSC:READY.PROC", put_complete=True, kind=Kind.config) get_ready = Component(EpicsSignal, "-OSC:READY.PROC", put_complete=True, kind=Kind.config)
@@ -460,9 +494,10 @@ class A3200Oscillator(Device):
RETURNS RETURNS
SubscriptionStatus : A waitable status subscribed on 'phase' updates. SubscriptionStatus : A waitable status subscribed on 'phase' updates.
""" """
# Define wait until the busy flag goes down (excluding initial update) # Define wait until the busy flag goes down (excluding initial update)
def in_status(*args, old_value, value, _, **kwargs): def in_status(*args, old_value, value, _, **kwargs):
result = bool(value==target) result = bool(value == target)
# print(f"Old {old_value}\tNew: {value}\tResult: {result}") # print(f"Old {old_value}\tNew: {value}\tResult: {result}")
return result return result
@@ -471,7 +506,6 @@ class A3200Oscillator(Device):
return status return status
# Automatically start an axis if directly invoked # Automatically start an axis if directly invoked
if __name__ == "__main__": if __name__ == "__main__":
omega = A3200Axis(prefix="X06DA-ES-DF1:OMEGA", name="omega") omega = A3200Axis(prefix="X06DA-ES-DF1:OMEGA", name="omega")
+40 -23
View File
@@ -6,14 +6,16 @@ import requests
try: try:
from bec_lib import bec_logger from bec_lib import bec_logger
logger = bec_logger.logger logger = bec_logger.logger
except ModuleNotFoundError: except ModuleNotFoundError:
import logging import logging
logger = logging.getLogger("SmarGon") logger = logging.getLogger("SmarGon")
class SmarGonSignal(Signal): class SmarGonSignal(Signal):
""" SmarGonSignal (R/W) """SmarGonSignal (R/W)
Small helper class to read/write parameters from SmarGon. As there is no Small helper class to read/write parameters from SmarGon. As there is no
motion status readback from smargopolo, this should be substituted with motion status readback from smargopolo, this should be substituted with
@@ -28,7 +30,7 @@ class SmarGonSignal(Signal):
# self.get() # self.get()
def put(self, value, *args, timestamp=None, **kwargs): def put(self, value, *args, timestamp=None, **kwargs):
""" Overriden put to add communication with smargopolo""" """Overriden put to add communication with smargopolo"""
# Validate new value # Validate new value
self.check_value(value) self.check_value(value)
@@ -44,15 +46,16 @@ class SmarGonSignal(Signal):
self.value = r[self.addr.upper()] self.value = r[self.addr.upper()]
# Notify subscribers # Notify subscribers
self._run_subs(sub_type=self.SUB_VALUE, old_value=old_value, self._run_subs(
value=value, timestamp=self._timestamp) sub_type=self.SUB_VALUE, old_value=old_value, value=value, timestamp=self._timestamp
)
@property @property
def limits(self): def limits(self):
return self._limits return self._limits
def check_value(self, value, **kwargs): def check_value(self, value, **kwargs):
""" Check if value falls within limits""" """Check if value falls within limits"""
lol = self.limits[0] lol = self.limits[0]
if lol is not None: if lol is not None:
if value < lol: if value < lol:
@@ -70,9 +73,9 @@ class SmarGonSignal(Signal):
class SmarGonSignalRO(Signal): class SmarGonSignalRO(Signal):
""" Small helper class for read-only parameters PVs from SmarGon. """Small helper class for read-only parameters PVs from SmarGon.
TODO: Add monitoring TODO: Add monitoring
""" """
def __init__(self, *args, read_addr="readbackSCS", **kwargs): def __init__(self, *args, read_addr="readbackSCS", **kwargs):
@@ -111,8 +114,9 @@ class SmarGonAxis(Device):
This class controls the SmarGon goniometer via the REST interface. This class controls the SmarGon goniometer via the REST interface.
""" """
# Status attributes # Status attributes
sg_url = Component(Signal, kind=Kind.config, metadata={'write_access': False}) sg_url = Component(Signal, kind=Kind.config, metadata={"write_access": False})
corr = Component(SmarGonSignalRO, read_addr="corr_type", kind=Kind.config) corr = Component(SmarGonSignalRO, read_addr="corr_type", kind=Kind.config)
mode = Component(SmarGonSignalRO, read_addr="mode", kind=Kind.config) mode = Component(SmarGonSignalRO, read_addr="mode", kind=Kind.config)
@@ -134,17 +138,25 @@ class SmarGonAxis(Device):
parent=None, parent=None,
device_manager=None, device_manager=None,
sg_url: str = "http://x06da-smargopolo.psi.ch:3000", sg_url: str = "http://x06da-smargopolo.psi.ch:3000",
low_limit = None, low_limit=None,
high_limit = None, high_limit=None,
**kwargs, **kwargs,
) -> None: ) -> None:
self.__class__.__dict__['readback'].kwargs['read_addr'] = f"readback{prefix}" self.__class__.__dict__["readback"].kwargs["read_addr"] = f"readback{prefix}"
self.__class__.__dict__['setpoint'].kwargs['write_addr'] = f"target{prefix}" self.__class__.__dict__["setpoint"].kwargs["write_addr"] = f"target{prefix}"
self.__class__.__dict__['setpoint'].kwargs['low_limit'] = low_limit self.__class__.__dict__["setpoint"].kwargs["low_limit"] = low_limit
self.__class__.__dict__['setpoint'].kwargs['high_limit'] = high_limit self.__class__.__dict__["setpoint"].kwargs["high_limit"] = high_limit
# super().__init__(prefix=prefix, name=name, kind=kind, read_attrs=read_attrs, configuration_attrs=configuration_attrs, parent=parent, device_manager=device_manager, **kwargs) # super().__init__(prefix=prefix, name=name, kind=kind, read_attrs=read_attrs, configuration_attrs=configuration_attrs, parent=parent, device_manager=device_manager, **kwargs)
super().__init__(prefix=prefix, name=name, kind=kind, read_attrs=read_attrs, configuration_attrs=configuration_attrs, parent=parent, **kwargs) super().__init__(
prefix=prefix,
name=name,
kind=kind,
read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
parent=parent,
**kwargs,
)
self.sg_url._metadata["write_access"] = False self.sg_url._metadata["write_access"] = False
self.sg_url.set(sg_url, force=True).wait() self.sg_url.set(sg_url, force=True).wait()
@@ -160,7 +172,9 @@ class SmarGonAxis(Device):
cmd = f"{self.sg_url.get()}/{address}" cmd = f"{self.sg_url.get()}/{address}"
r = requests.get(cmd, timeout=1) r = requests.get(cmd, timeout=1)
if not r.ok: if not r.ok:
raise RuntimeError(f"[{self.name}] Error getting {address}; server returned {r.status_code} => {r.reason}") raise RuntimeError(
f"[{self.name}] Error getting {address}; server returned {r.status_code} => {r.reason}"
)
return r.json() return r.json()
def _go_n_put(self, address, **kwargs): def _go_n_put(self, address, **kwargs):
@@ -168,19 +182,22 @@ class SmarGonAxis(Device):
cmd = f"{self.sg_url.get()}/{address}" cmd = f"{self.sg_url.get()}/{address}"
r = requests.put(cmd, timeout=1) r = requests.put(cmd, timeout=1)
if not r.ok: if not r.ok:
raise RuntimeError(f"[{self.name}] Error putting {address}; server returned {r.status_code} => {r.reason}") raise RuntimeError(
f"[{self.name}] Error putting {address}; server returned {r.status_code} => {r.reason}"
)
return r.json() return r.json()
if __name__ == "__main__": if __name__ == "__main__":
shx = SmarGonAxis(prefix="SCS", name="shx", sg_url="http://x06da-smargopolo.psi.ch:3000") shx = SmarGonAxis(prefix="SCS", name="shx", sg_url="http://x06da-smargopolo.psi.ch:3000")
shy = SmarGonAxis(prefix="SCS", name="shy", sg_url="http://x06da-smargopolo.psi.ch:3000") shy = SmarGonAxis(prefix="SCS", name="shy", sg_url="http://x06da-smargopolo.psi.ch:3000")
shz = SmarGonAxis(prefix="SCS", name="shz", low_limit=10, high_limit=22, sg_url="http://x06da-smargopolo.psi.ch:3000") shz = SmarGonAxis(
prefix="SCS",
name="shz",
low_limit=10,
high_limit=22,
sg_url="http://x06da-smargopolo.psi.ch:3000",
)
shx.wait_for_connection() shx.wait_for_connection()
shy.wait_for_connection() shy.wait_for_connection()
shz.wait_for_connection() shz.wait_for_connection()
+19 -20
View File
@@ -3,6 +3,7 @@
Scan primitives for standard BEC scans at the PX beamlines at SLS. Scan primitives for standard BEC scans at the PX beamlines at SLS.
Theese scans define the event model and can be called from higher levels. Theese scans define the event model and can be called from higher levels.
""" """
import numpy as np import numpy as np
from bec_lib import bec_logger from bec_lib import bec_logger
@@ -11,9 +12,9 @@ from bec_server.scan_server.scans import AsyncFlyScanBase, ScanArgType, ScanBase
logger = bec_logger.logger logger = bec_logger.logger
class AbrCmd: class AbrCmd:
""" Valid Aerotech ABR scan commands from the AeroBasic files""" """Valid Aerotech ABR scan commands from the AeroBasic files"""
NONE = 0 NONE = 0
RASTER_SCAN_SIMPLE = 1 RASTER_SCAN_SIMPLE = 1
MEASURE_STANDARD = 2 MEASURE_STANDARD = 2
@@ -38,9 +39,8 @@ class AbrCmd:
# SCAN_SASTT_V3 = 21 # SCAN_SASTT_V3 = 21
class AerotechFlyscanBase(AsyncFlyScanBase): class AerotechFlyscanBase(AsyncFlyScanBase):
""" Base class for MX flyscans """Base class for MX flyscans
Low-level base class for standard scans at the PX beamlines at SLS. Theese Low-level base class for standard scans at the PX beamlines at SLS. Theese
scans use the A3200 rotation stage and the actual experiment is performed scans use the A3200 rotation stage and the actual experiment is performed
@@ -56,20 +56,21 @@ class AerotechFlyscanBase(AsyncFlyScanBase):
abr_complete : bool abr_complete : bool
Wait for the launched ABR task to complete. Wait for the launched ABR task to complete.
""" """
scan_type = "fly" scan_type = "fly"
scan_report_hint = "table" scan_report_hint = "table"
arg_input = {} arg_input = {}
arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None} arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None}
# Aerotech stage config # Aerotech stage config
abr_raster_reset =False abr_raster_reset = False
abr_complete = False abr_complete = False
abr_timeout = None abr_timeout = None
pointID = 0 pointID = 0
num_pos = 0 num_pos = 0
def __init__(self, *args, parameter: dict = None, **kwargs): def __init__(self, *args, parameter: dict = None, **kwargs):
""" Just set num_pos=0 to avoid hanging and override defaults if explicitly set from """Just set num_pos=0 to avoid hanging and override defaults if explicitly set from
parameters. parameters.
""" """
super().__init__(parameter=parameter, **kwargs) super().__init__(parameter=parameter, **kwargs)
@@ -81,14 +82,14 @@ class AerotechFlyscanBase(AsyncFlyScanBase):
self.abr_timeout = self.caller_kwargs.get("abr_timeout") self.abr_timeout = self.caller_kwargs.get("abr_timeout")
def pre_scan(self): def pre_scan(self):
""" Mostly just checking if ABR stage is ok...""" """Mostly just checking if ABR stage is ok..."""
# TODO: Move roughly to start position??? # TODO: Move roughly to start position???
# ABR status checking # ABR status checking
stat = yield from self.stubs.send_rpc_and_wait("abr","status.get") stat = yield from self.stubs.send_rpc_and_wait("abr", "status.get")
if stat not in (0, "OK"): if stat not in (0, "OK"):
raise RuntimeError("Aerotech ABR seems to be in error state {stat}, please reset") raise RuntimeError("Aerotech ABR seems to be in error state {stat}, please reset")
task = yield from self.stubs.send_rpc_and_wait("abr","task1.get") task = yield from self.stubs.send_rpc_and_wait("abr", "task1.get")
# From what I got values are copied to local vars at the start of scan, # From what I got values are copied to local vars at the start of scan,
# so only kickoff should be forbidden. # so only kickoff should be forbidden.
if task not in (1, "OK"): if task not in (1, "OK"):
@@ -106,8 +107,7 @@ class AerotechFlyscanBase(AsyncFlyScanBase):
# return super().stage() # return super().stage()
def scan_core(self): def scan_core(self):
""" The actual scan logic comes here. """The actual scan logic comes here."""
"""
# Kick off the run # Kick off the run
yield from self.stubs.send_rpc_and_wait("abr", "bluekickoff") yield from self.stubs.send_rpc_and_wait("abr", "bluekickoff")
logger.info("Measurement launched on the ABR stage...") logger.info("Measurement launched on the ABR stage...")
@@ -127,9 +127,8 @@ class AerotechFlyscanBase(AsyncFlyScanBase):
return super().cleanup() return super().cleanup()
class MeasureStandardWedge(AerotechFlyscanBase): class MeasureStandardWedge(AerotechFlyscanBase):
""" Standard wedge scan using the OMEGA motor """Standard wedge scan using the OMEGA motor
Measure an absolute continous line scan from `start` to `start` + `range` Measure an absolute continous line scan from `start` to `start` + `range`
during `move_time` on the Omega axis with PSO output. during `move_time` on the Omega axis with PSO output.
@@ -152,13 +151,13 @@ class MeasureStandardWedge(AerotechFlyscanBase):
ready_rate : float, optional ready_rate : float, optional
No clue what is this... (default=500) No clue what is this... (default=500)
""" """
scan_name = "standardscan" scan_name = "standardscan"
required_kwargs = ["start", "range", "move_time"] required_kwargs = ["start", "range", "move_time"]
class MeasureVerticalLine(AerotechFlyscanBase): class MeasureVerticalLine(AerotechFlyscanBase):
""" Vertical line scan using the GMY motor """Vertical line scan using the GMY motor
Simple relative continous line scan that records a single vertical line Simple relative continous line scan that records a single vertical line
with PSO output. There's no actual stepping, it's only used for velocity with PSO output. There's no actual stepping, it's only used for velocity
@@ -180,13 +179,13 @@ class MeasureVerticalLine(AerotechFlyscanBase):
exp_time : float exp_time : float
Eeffective exposure time per step [s] Eeffective exposure time per step [s]
""" """
scan_name = "vlinescan" scan_name = "vlinescan"
required_kwargs = ["exp_time", "range", "steps"] required_kwargs = ["exp_time", "range", "steps"]
class MeasureRasterSimple(AerotechFlyscanBase): class MeasureRasterSimple(AerotechFlyscanBase):
""" Simple raster scan """Simple raster scan
Measure a simplified relative zigzag raster scan in the X-Y plane. Measure a simplified relative zigzag raster scan in the X-Y plane.
The scan is relative assumes the goniometer is currently at the CENTER of The scan is relative assumes the goniometer is currently at the CENTER of
@@ -213,14 +212,13 @@ class MeasureRasterSimple(AerotechFlyscanBase):
steps_y : int steps_y : int
Number of scan steps in Y (slow). Number of scan steps in Y (slow).
""" """
scan_name = "rasterscan" scan_name = "rasterscan"
required_kwargs = ["exp_time", "range_x", "range_y", "steps_x", "steps_y"] required_kwargs = ["exp_time", "range_x", "range_y", "steps_x", "steps_y"]
class MeasureScreening(AerotechFlyscanBase): class MeasureScreening(AerotechFlyscanBase):
""" Sample screening scan """Sample screening scan
Sample screening scan that scans intervals on the rotation axis taking Sample screening scan that scans intervals on the rotation axis taking
1 image/interval. This makes sure that we hit diffraction peaks if there 1 image/interval. This makes sure that we hit diffraction peaks if there
@@ -248,5 +246,6 @@ class MeasureScreening(AerotechFlyscanBase):
delta : float delta : float
Safety margin for sub-range movements (default: 0.5) [deg]. Safety margin for sub-range movements (default: 0.5) [deg].
""" """
scan_name = "screeningscan" scan_name = "screeningscan"
required_kwargs = ["start", "range", "steps", "exp_time", "oscrange"] required_kwargs = ["start", "range", "steps", "exp_time", "oscrange"]