This commit is contained in:
gac-x06da
2025-01-24 15:13:24 +01:00
parent 24302c244d
commit 015bf2ee3b
5 changed files with 266 additions and 192 deletions

View File

@@ -78,11 +78,14 @@ Examples
abr.stop() # this function only returns after the STATUS is back to OK
"""
import time
from ophyd import Component, EpicsSignal, EpicsSignalRO, Kind
from ophyd.status import SubscriptionStatus
from ophyd_devices.interfaces.base_classes.psi_detector_base import PSIDetectorBase as PsiDeviceBase
from ophyd_devices.interfaces.base_classes.psi_detector_base import CustomDetectorMixin as CustomDeviceMixin
from ophyd_devices.interfaces.base_classes.psi_detector_base import (
CustomDetectorMixin as CustomDeviceMixin,
)
try:
from .A3200enums import AbrCmd, AbrMode
@@ -92,23 +95,21 @@ except ImportError:
try:
from bec_lib import bec_logger
logger = bec_logger.logger
except ModuleNotFoundError:
import logging
logger = logging.getLogger("A3200")
# pylint: disable=logging-fstring-interpolation
class AerotechAbrMixin(CustomDeviceMixin):
""" Configuration class for the Aerotech A3200 controller for the ABR stage"""
"""Configuration class for the Aerotech A3200 controller for the ABR stage"""
def on_stage(self):
"""
NOTE: Zac's request is that stage is essentially ARM, i.e. get ready and don't do anything.
"""
@@ -116,8 +117,8 @@ class AerotechAbrMixin(CustomDeviceMixin):
d = {}
if self.parent.scaninfo.scan_type in ("measure", "measurement", "fly"):
scanargs = self.parent.scaninfo.scan_msg.info['kwargs']
scanname = self.parent.scaninfo.scan_msg.info['scan_name']
scanargs = self.parent.scaninfo.scan_msg.info["kwargs"]
scanname = self.parent.scaninfo.scan_msg.info["scan_name"]
if scanname in ("standardscan"):
scan_start = scanargs["start"]
@@ -186,27 +187,24 @@ class AerotechAbrMixin(CustomDeviceMixin):
d["var_9"] = 0
# Reconfigure if got a valid scan config
if len(d)>0:
if len(d) > 0:
self.parent.configure(d)
# Stage the parent
self.parent.bluestage()
def on_kickoff(self):
""" Kick off parent """
"""Kick off parent"""
self.parent.bluekickoff()
def on_unstage(self):
""" Unstage the ABR controller"""
"""Unstage the ABR controller"""
self.parent.blueunstage()
class AerotechAbrStage(PsiDeviceBase):
""" Standard PX stage on A3200 controller
"""Standard PX stage on A3200 controller
This is the wrapper class for the standard rotation stage layout for the PX
beamlines at SLS. It wraps the main rotation axis OMEGA (Aerotech ABR)and
the associated motion axes GMX, GMY and GMZ. The ophyd class associates to
@@ -214,23 +212,20 @@ class AerotechAbrStage(PsiDeviceBase):
is running as an AeroBasic program on the controller and we communicate to
it via 10+1 global variables.
"""
custom_prepare_cls = AerotechAbrMixin
USER_ACCESS = ['reset', 'kickoff', 'complete']
taskStop = Component(
EpicsSignal, "-AERO:TSK-STOP", put_complete=True, kind=Kind.omitted)
status = Component(
EpicsSignal, "-AERO:STAT", put_complete=True, kind=Kind.omitted)
clear = Component(
EpicsSignal, "-AERO:CTRL-CLFT", put_complete=True, kind=Kind.omitted)
custom_prepare_cls = AerotechAbrMixin
USER_ACCESS = ["reset", "kickoff", "complete"]
taskStop = Component(EpicsSignal, "-AERO:TSK-STOP", put_complete=True, kind=Kind.omitted)
status = Component(EpicsSignal, "-AERO:STAT", put_complete=True, kind=Kind.omitted)
clear = Component(EpicsSignal, "-AERO:CTRL-CLFT", put_complete=True, kind=Kind.omitted)
# Enable/disable motor movement via the IOC (i.e. make it task-only)
axisModeLocked = Component(
EpicsSignal, "-DF1:LOCK", put_complete=True, kind=Kind.omitted)
axisModeLocked = Component(EpicsSignal, "-DF1:LOCK", put_complete=True, kind=Kind.omitted)
axisModeDirect = Component(
EpicsSignal, "-DF1:MODE-DIRECT", put_complete=True, kind=Kind.omitted)
axisAxesMode = Component(
EpicsSignal, "-DF1:AXES-MODE", put_complete=True, kind=Kind.omitted)
EpicsSignal, "-DF1:MODE-DIRECT", put_complete=True, kind=Kind.omitted
)
axisAxesMode = Component(EpicsSignal, "-DF1:AXES-MODE", put_complete=True, kind=Kind.omitted)
# Shutter box is missing readback so the -GET signal is installed on the VME
# _shutter = Component(
@@ -244,12 +239,13 @@ class AerotechAbrStage(PsiDeviceBase):
gmz_done = Component(EpicsSignalRO, "-DF1:GMZ-DONE", kind=Kind.normal)
# For some reason the task interface is called PSO...
scan_command = Component(
EpicsSignal, "-PSO:CMD", put_complete=True, kind=Kind.omitted)
scan_command = Component(EpicsSignal, "-PSO:CMD", put_complete=True, kind=Kind.omitted)
start_command = Component(
EpicsSignal, "-PSO:START-TEST.PROC", put_complete=True, kind=Kind.omitted)
EpicsSignal, "-PSO:START-TEST.PROC", put_complete=True, kind=Kind.omitted
)
stop_command = Component(
EpicsSignal, "-PSO:STOP-TEST.PROC", put_complete=True, kind=Kind.omitted)
EpicsSignal, "-PSO:STOP-TEST.PROC", put_complete=True, kind=Kind.omitted
)
# Global variables to controll AeroBasic scripts
_var_1 = Component(EpicsSignal, "-PSO:VAR-1", put_complete=True, kind=Kind.omitted)
@@ -274,23 +270,23 @@ class AerotechAbrStage(PsiDeviceBase):
raster_num_rows = Component(EpicsSignal, "-GRD:ROW-DONE", kind=Kind.config)
def set_axis_mode(self, mode: str, settle_time=0.1) -> None:
""" Set axis mode to direct/measurement mode.
"""Set axis mode to direct/measurement mode.
Measurement ensures that the scrips run undisturbed by blocking axis
motion commands from the IOC (i.e. internal only).
motion commands from the IOC (i.e. internal only).
Parameters:
-----------
mode : str
Valid values are 'direct' and 'measuring'.
"""
if mode=="direct":
if mode == "direct":
self.axisModeDirect.set(37, settle_time=settle_time).wait()
if mode=="measuring":
if mode == "measuring":
self.axisAxesMode.set(AbrMode.MEASURING, settle_time=settle_time).wait()
def configure(self, d: dict) -> tuple:
"""" Configure the exposure scripts
""" " Configure the exposure scripts
Script execution at the PX beamlines is based on scripts that are always
running on the controller that execute commands when commanded by
@@ -317,54 +313,56 @@ class AerotechAbrStage(PsiDeviceBase):
old = self.read_configuration()
# ToDo: Check if idle before reconfiguring
self.scan_command.set(d['scan_command']).wait()
self.scan_command.set(d["scan_command"]).wait()
# Set the corresponding global variables
if "var_1" in d and d['var_1'] is not None:
self._var_1.set(d['var_1']).wait()
if "var_2" in d and d['var_2'] is not None:
self._var_2.set(d['var_2']).wait()
if "var_3" in d and d['var_3'] is not None:
self._var_3.set(d['var_3']).wait()
if "var_4" in d and d['var_4'] is not None:
self._var_4.set(d['var_4']).wait()
if "var_5" in d and d['var_5'] is not None:
self._var_5.set(d['var_5']).wait()
if "var_6" in d and d['var_6'] is not None:
self._var_6.set(d['var_6']).wait()
if "var_7" in d and d['var_7'] is not None:
self._var_7.set(d['var_7']).wait()
if "var_8" in d and d['var_8'] is not None:
self._var_8.set(d['var_8']).wait()
if "var_9" in d and d['var_9'] is not None:
self._var_9.set(d['var_9']).wait()
if "var_10" in d and d['var_10'] is not None:
self._var_10.set(d['var_10']).wait()
if "var_1" in d and d["var_1"] is not None:
self._var_1.set(d["var_1"]).wait()
if "var_2" in d and d["var_2"] is not None:
self._var_2.set(d["var_2"]).wait()
if "var_3" in d and d["var_3"] is not None:
self._var_3.set(d["var_3"]).wait()
if "var_4" in d and d["var_4"] is not None:
self._var_4.set(d["var_4"]).wait()
if "var_5" in d and d["var_5"] is not None:
self._var_5.set(d["var_5"]).wait()
if "var_6" in d and d["var_6"] is not None:
self._var_6.set(d["var_6"]).wait()
if "var_7" in d and d["var_7"] is not None:
self._var_7.set(d["var_7"]).wait()
if "var_8" in d and d["var_8"] is not None:
self._var_8.set(d["var_8"]).wait()
if "var_9" in d and d["var_9"] is not None:
self._var_9.set(d["var_9"]).wait()
if "var_10" in d and d["var_10"] is not None:
self._var_10.set(d["var_10"]).wait()
new = self.read_configuration()
return old, new
def bluestage(self):
""" Bluesky-style stage
Since configuration synchronization is not guaranteed, this does
nothing. The script launched by kickoff().
"""Bluesky-style stage
Since configuration synchronization is not guaranteed, this does
nothing. The script launched by kickoff().
"""
pass
def bluekickoff(self, timeout=1) -> SubscriptionStatus:
""" Kick off the set program """
"""Kick off the set program"""
self.start_command.set(1).wait()
# Define wait until the busy flag goes high
def is_busy(*args, value, **kwargs):
return bool(value==0)
return bool(value == 0)
# Subscribe and wait for update
status = SubscriptionStatus(self.raster_scan_done, is_busy, timeout=timeout, settle_time=0.1)
status = SubscriptionStatus(
self.raster_scan_done, is_busy, timeout=timeout, settle_time=0.1
)
return status
def blueunstage(self, settle_time=0.1):
""" Stops current script and releases the axes"""
"""Stops current script and releases the axes"""
# Disarm commands
self.scan_command.set(AbrCmd.NONE, settle_time=settle_time).wait()
self.stop_command.set(1).wait()
@@ -372,22 +370,25 @@ class AerotechAbrStage(PsiDeviceBase):
self.set_axis_mode("direct", settle_time=settle_time)
def complete(self, timeout=None) -> SubscriptionStatus:
""" Waits for task execution
"""Waits for task execution
NOTE: Original complete was raster scanner complete...
"""
# Define wait until the busy flag goes down (excluding initial update)
def is_idle(*args, value, **kwargs):
return bool(value==1)
return bool(value == 1)
# Subscribe and wait for update
# status = SubscriptionStatus(self.task1, is_idle, timeout=timeout, settle_time=0.5)
status = SubscriptionStatus(self.raster_scan_done, is_idle, timeout=timeout, settle_time=0.5)
status = SubscriptionStatus(
self.raster_scan_done, is_idle, timeout=timeout, settle_time=0.5
)
return status
def reset(self, settle_time=0.1, wait_after_reload=1) -> None:
""" Resets the Aerotech controller state
"""Resets the Aerotech controller state
Attempts to reset the currently running measurement task on the A3200
by stopping current motions, reloading aerobasic programs and going
to DIRECT mode.
@@ -405,8 +406,7 @@ class AerotechAbrStage(PsiDeviceBase):
self.set_axis_mode("direct", settle_time=settle_time)
def stop(self, settle_time=1.0) -> None:
""" Stops current motions
"""
"""Stops current motions"""
# Disarm commands
self.scan_command.set(AbrCmd.NONE, settle_time=settle_time).wait()
self.stop_command.set(1).wait()
@@ -466,7 +466,7 @@ class AerotechAbrStage(PsiDeviceBase):
# return state == self._shutter.get()
def wait_for_movements(self, timeout=60.0):
""" Waits for all motor movements"""
"""Waits for all motor movements"""
t_start = time.time()
t_elapsed = 0
@@ -480,7 +480,10 @@ class AerotechAbrStage(PsiDeviceBase):
"""Chechs if all axes are DONE"""
return not (
self.omega_done.get()
and self.gmx_done.get() and self.gmy_done.get() and self.gmz_done.get() )
and self.gmx_done.get()
and self.gmy_done.get()
and self.gmz_done.get()
)
# def start_exposure(self):
# """Starts the previously configured exposure."""

View File

@@ -6,50 +6,67 @@ Enumerations for the MX specific Aerotech A3200 stage.
"""
# pylint: disable=too-few-public-methods
class AbrStatus:
"""ABR measurement task status"""
DONE = 0
READY = 1
BUSY = 2
ABR_DONE = 0
ABR_READY = 1
ABR_BUSY = 2
class AbrGridStatus:
"""ABR grid scan status"""
BUSY = 0
DONE = 1
GRID_SCAN_BUSY = 0
GRID_SCAN_DONE = 1
class AbrMode:
"""ABR mode status"""
DIRECT = 0
MEASURING = 1
DIRECT_MODE = 0
MEASURING_MODE = 1
class AbrShutterStatus:
"""ABR shutter status"""
CLOSE = 0
OPEN = 1
SHUTTER_CLOSE = 0
SHUTTER_OPEN = 1
class AbrGridPeriod:
"""ABR grid period"""
FULL = 0
HALF = 1
FULL_PERIOD = 0
HALF_PERIOD = 1
class AbrCmd:
"""ABR command table"""
NONE = 0
RASTER_SCAN_SIMPLE = 1
MEASURE_STANDARD = 2
@@ -73,6 +90,7 @@ class AbrCmd:
SCAN_SASTT_V2 = 20
SCAN_SASTT_V3 = 21
CMD_NONE = 0
CMD_RASTER_SCAN_SIMPLE = 1
CMD_MEASURE_STANDARD = 2
@@ -96,8 +114,10 @@ CMD_SCAN_SASTT = 19
CMD_SCAN_SASTT_V2 = 20
CMD_SCAN_SASTT_V3 = 21
class AbrAxis:
"""ABR axis index"""
OMEGA = 1
GMX = 2
GMY = 3
@@ -105,9 +125,10 @@ class AbrAxis:
STY = 5
STZ = 6
AXIS_OMEGA = 1
AXIS_GMX = 2
AXIS_GMY = 3
AXIS_GMZ = 4
AXIS_STY = 5
AXIS_STZ = 6
AXIS_STZ = 6

View File

@@ -6,20 +6,27 @@ Created on Tue Jun 11 11:28:38 2024
"""
import types
import math
from ophyd import (Component, PVPositioner, Device, Signal, EpicsSignal, EpicsSignalRO, Kind,
PositionerBase)
from ophyd import (
Component,
PVPositioner,
Device,
Signal,
EpicsSignal,
EpicsSignalRO,
Kind,
PositionerBase,
)
from ophyd.status import Status, MoveStatus, SubscriptionStatus, DeviceStatus
from bec_lib import bec_logger
logger = bec_logger.logger
from .A3200enums import *
ABR_DONE = 0
ABR_READY = 1
ABR_BUSY = 2
@@ -32,12 +39,12 @@ MEASURING_MODE = 1
class A3200Axis(PVPositioner):
""" Positioner wrapper for A3200 axes
"""Positioner wrapper for A3200 axes
Positioner wrapper for motors on the Aerotech A3200 controller. As the IOC
does not provide a motor record, this class simply wraps axes into a
standard Ophyd positioner for the BEC. It also has some additional
standard Ophyd positioner for the BEC. It also has some additional
functionality for error checking and diagnostics.
Examples
@@ -56,19 +63,22 @@ class A3200Axis(PVPositioner):
base_pv : str (situational)
IOC PV name root, i.e. X06DA-ES if standalone class.
"""
USER_ACCESS = ['omove']
USER_ACCESS = ["omove"]
abr_mode_direct = Component(
EpicsSignal, "-DF1:MODE-DIRECT", put_complete=True, kind=Kind.omitted)
EpicsSignal, "-DF1:MODE-DIRECT", put_complete=True, kind=Kind.omitted
)
abr_mode = Component(
EpicsSignal, "-DF1:AXES-MODE", auto_monitor=True, put_complete=True, kind=Kind.omitted)
EpicsSignal, "-DF1:AXES-MODE", auto_monitor=True, put_complete=True, kind=Kind.omitted
)
# Basic PV positioner interface
done = Component(EpicsSignalRO, "-DONE", auto_monitor=True, kind=Kind.config)
readback = Component(EpicsSignalRO, "-RBV", auto_monitor=True, kind=Kind.hinted)
# Setpoint is one of the two...
setpoint = Component(EpicsSignal, "-SETP", kind=Kind.config)
#setpoint = Component(EpicsSignal, "-VAL", kind=Kind.config)
# setpoint = Component(EpicsSignal, "-VAL", kind=Kind.config)
velocity = Component(EpicsSignal, "-SETV", kind=Kind.config)
status = Component(EpicsSignalRO, "-STAT", auto_monitor=True, kind=Kind.config)
@@ -88,7 +98,7 @@ class A3200Axis(PVPositioner):
_rock_dist = Component(EpicsSignal, "-RINCP", put_complete=True, kind=Kind.config)
_rock_velo = Component(EpicsSignal, "-RSETV", put_complete=True, kind=Kind.config)
_rock_count = Component(EpicsSignal, "-COUNT", put_complete=True, kind=Kind.config)
#_rock_accel = Component(EpicsSignal, "-RRATE", put_complete=True, kind=Kind.config)
# _rock_accel = Component(EpicsSignal, "-RRATE", put_complete=True, kind=Kind.config)
hlm = Component(Signal, kind=Kind.config)
llm = Component(Signal, kind=Kind.config)
@@ -96,37 +106,69 @@ class A3200Axis(PVPositioner):
vmax = Component(Signal, kind=Kind.config)
offset = Component(EpicsSignal, "-OFF", put_complete=True, kind=Kind.config)
def __init__(self, prefix="", *, name, base_pv="", kind=None, read_attrs=None,
configuration_attrs=None, parent=None, llm=0, hlm=0, vmin=0, vmax=0, **kwargs):
""" __init__ MUST have a full argument list"""
def __init__(
self,
prefix="",
*,
name,
base_pv="",
kind=None,
read_attrs=None,
configuration_attrs=None,
parent=None,
llm=0,
hlm=0,
vmin=0,
vmax=0,
**kwargs,
):
"""__init__ MUST have a full argument list"""
# Patching the parent's PVs into the axis class to check for direct/locked mode
if parent is None:
def maybe_add_prefix(self, instance, kw, suffix):
# Patched not to enforce parent prefix when no parent
if kw in self.add_prefix:
return suffix
return suffix
self.__class__.__dict__["abr_mode"].maybe_add_prefix = types.MethodType(
maybe_add_prefix,
self.__class__.__dict__["abr_mode"])
maybe_add_prefix, self.__class__.__dict__["abr_mode"]
)
self.__class__.__dict__["abr_mode_direct"].maybe_add_prefix = types.MethodType(
maybe_add_prefix,
self.__class__.__dict__["abr_mode_direct"])
maybe_add_prefix, self.__class__.__dict__["abr_mode_direct"]
)
logger.info(self.__class__.__dict__["abr_mode"].kwargs)
self.__class__.__dict__["abr_mode"].suffix = base_pv + "-DF1:AXES-MODE"
self.__class__.__dict__["abr_mode_direct"].suffix = base_pv + "-DF1:MODE-DIRECT"
super().__init__(prefix=prefix, name=name, kind=kind, read_attrs=read_attrs,
configuration_attrs=configuration_attrs, parent=parent, **kwargs)
super().__init__(
prefix=prefix,
name=name,
kind=kind,
read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
parent=parent,
**kwargs,
)
self.llm.set(llm).wait()
self.hlm.set(hlm).wait()
self.vmin.set(vmin).wait()
self.vmax.set(vmax).wait()
def omove(self, position, wait=True, timeout=None, moved_cb=None, velocity=None, relative=False,
direct=False, **kwargs) -> MoveStatus:
""" Native absolute/relative movement on the A3200 """
def omove(
self,
position,
wait=True,
timeout=None,
moved_cb=None,
velocity=None,
relative=False,
direct=False,
**kwargs,
) -> MoveStatus:
"""Native absolute/relative movement on the A3200"""
# Check if we're in direct movement mode
if self.abr_mode.value not in (DIRECT_MODE, "DIRECT"):
@@ -169,19 +211,18 @@ class A3200Axis(PVPositioner):
raise
return status
def move(self, position, wait=True, timeout=None, moved_cb=None, **kwargs) -> MoveStatus:
""" Exposes the ophyd move command through BEC abstraction"""
"""Exposes the ophyd move command through BEC abstraction"""
return self.omove(position, wait=wait, timeout=timeout, moved_cb=moved_cb, **kwargs)
def rock(self, distance, counts: int, velocity=None, wait=True) -> Status:
""" Repeated single axis zigzag scan I guess PSO should be configured for this"""
"""Repeated single axis zigzag scan I guess PSO should be configured for this"""
self._rock_dist.put(distance)
self._rock_count.put(counts)
if velocity is not None:
self._rock_velo.put(velocity)
#if acceleration is not None:
# if acceleration is not None:
# self._rock_accel.put(acceleration)
self._rock.put(1)
@@ -190,7 +231,6 @@ class A3200Axis(PVPositioner):
status.wait()
return status
# def is_omega_ok(self):
# """Checks omega axis status"""
# return 0 == self.self.omega.status.get()
@@ -216,21 +256,11 @@ class A3200Axis(PVPositioner):
# time.sleep(0.2)
class A3200RasterScanner(Device):
"""Positioner wrapper for motors on the Aerotech A3200 controller. As the
IOC does not provide a motor record, this class simply wraps axes into
a standard Ophyd positioner for the BEC. It also has some additional
functionality for diagnostics."""
"""Positioner wrapper for motors on the Aerotech A3200 controller. As the
IOC does not provide a motor record, this class simply wraps axes into
a standard Ophyd positioner for the BEC. It also has some additional
functionality for diagnostics."""
x_start = Component(EpicsSignal, "-GRD:GMX-START", kind=Kind.config)
x_start = Component(EpicsSignal, "-GRD:GMX-END", kind=Kind.config)
@@ -259,10 +289,11 @@ class A3200RasterScanner(Device):
# Also needs the command interface
_zcmd = Component(EpicsSignal, "-PSO:CMD", put_complete=True, kind=Kind.omitted)
_start_command = Component(
EpicsSignal, "-PSO:START-TEST.PROC", put_complete=True, kind=Kind.omitted)
EpicsSignal, "-PSO:START-TEST.PROC", put_complete=True, kind=Kind.omitted
)
_stop_command = Component(
EpicsSignal, "-PSO:STOP-TEST.PROC", put_complete=True, kind=Kind.omitted)
EpicsSignal, "-PSO:STOP-TEST.PROC", put_complete=True, kind=Kind.omitted
)
def raster_setup(self, positions, columns, angle, etime):
"""configures parameters for a raster scan
@@ -334,7 +365,7 @@ class A3200RasterScanner(Device):
def raster_get_ready(self):
"""
WTF is this???
WTF is this???
"""
self.get_ready.set(1).wait()
@@ -342,9 +373,10 @@ class A3200RasterScanner(Device):
try:
# Define wait until the busy flag goes down (excluding initial update)
timestamp_ = 0
def is_ready(*args, old_value, value, timestamp, **kwargs):
nonlocal timestamp_
result = False if (timestamp_== 0) else bool(value==ABR_READY)
result = False if (timestamp_ == 0) else bool(value == ABR_READY)
print(f"Old {old_value}\tNew: {value}\tResult: {result}")
timestamp_ = timestamp
return result
@@ -370,9 +402,10 @@ class A3200RasterScanner(Device):
def wait_scan_done(self, timeout=60.0) -> SubscriptionStatus:
# Define wait until the busy flag goes down (excluding initial update)
timestamp_ = 0
def is_ready(*args, old_value, value, timestamp, **kwargs):
nonlocal timestamp_
result = False if (timestamp_== 0) else bool(value==1)
result = False if (timestamp_ == 0) else bool(value == 1)
print(f"Old {old_value}\tNew: {value}\tResult: {result}")
timestamp_ = timestamp
return result
@@ -389,9 +422,10 @@ class A3200RasterScanner(Device):
def raster_wait_for_row(self, timeout=60) -> SubscriptionStatus:
# Define wait subscription
timestamp_ = 0
def is_ready(*args, old_value, value, timestamp, **kwargs):
nonlocal timestamp_
result = False if (timestamp_== 0) else bool(value==1)
result = False if (timestamp_ == 0) else bool(value == 1)
print(f"Old {old_value}\tNew: {value}\tResult: {result}")
timestamp_ = timestamp
return result
@@ -403,16 +437,17 @@ class A3200RasterScanner(Device):
return status
def complete(self, timeout=None) -> DeviceStatus:
""" Wait for the grid scanner to finish
"""Wait for the grid scanner to finish
TODO: Probably redundant with task status wait?
"""
if timeout is not None:
# Define wait until the busy flag goes down (excluding initial update)
timestamp_ = 0
def is_ready(*args, old_value, value, timestamp, **kwargs):
nonlocal timestamp_
result = False if (timestamp_== 0) else bool(value==GRID_SCAN_DONE)
result = False if (timestamp_ == 0) else bool(value == GRID_SCAN_DONE)
print(f"Old {old_value}\tNew: {value}\tResult: {result}")
timestamp_ = timestamp
return result
@@ -425,11 +460,10 @@ class A3200RasterScanner(Device):
return status
class A3200Oscillator(Device):
"""No clue what this does, seems to be redundant with the task based grid scanners...
"""
ostart_pos = Component(EpicsSignal, "-OSC:START-POS", put_complete=True, kind=Kind.config)
"""No clue what this does, seems to be redundant with the task based grid scanners..."""
ostart_pos = Component(EpicsSignal, "-OSC:START-POS", put_complete=True, kind=Kind.config)
orange = Component(EpicsSignal, "-OSC:RANGE", put_complete=True, kind=Kind.config)
etime = Component(EpicsSignal, "-OSC:ETIME", put_complete=True, kind=Kind.config)
get_ready = Component(EpicsSignal, "-OSC:READY.PROC", put_complete=True, kind=Kind.config)
@@ -460,9 +494,10 @@ class A3200Oscillator(Device):
RETURNS
SubscriptionStatus : A waitable status subscribed on 'phase' updates.
"""
# Define wait until the busy flag goes down (excluding initial update)
def in_status(*args, old_value, value, _, **kwargs):
result = bool(value==target)
result = bool(value == target)
# print(f"Old {old_value}\tNew: {value}\tResult: {result}")
return result
@@ -471,7 +506,6 @@ class A3200Oscillator(Device):
return status
# Automatically start an axis if directly invoked
if __name__ == "__main__":
omega = A3200Axis(prefix="X06DA-ES-DF1:OMEGA", name="omega")

View File

@@ -6,18 +6,20 @@ import requests
try:
from bec_lib import bec_logger
logger = bec_logger.logger
except ModuleNotFoundError:
import logging
logger = logging.getLogger("SmarGon")
class SmarGonSignal(Signal):
""" SmarGonSignal (R/W)
"""SmarGonSignal (R/W)
Small helper class to read/write parameters from SmarGon. As there is no
motion status readback from smargopolo, this should be substituted with
setting with 'settle_time'.
setting with 'settle_time'.
"""
def __init__(self, *args, write_addr="targetSCS", low_limit=None, high_limit=None, **kwargs):
@@ -28,7 +30,7 @@ class SmarGonSignal(Signal):
# self.get()
def put(self, value, *args, timestamp=None, **kwargs):
""" Overriden put to add communication with smargopolo"""
"""Overriden put to add communication with smargopolo"""
# Validate new value
self.check_value(value)
@@ -44,15 +46,16 @@ class SmarGonSignal(Signal):
self.value = r[self.addr.upper()]
# Notify subscribers
self._run_subs(sub_type=self.SUB_VALUE, old_value=old_value,
value=value, timestamp=self._timestamp)
self._run_subs(
sub_type=self.SUB_VALUE, old_value=old_value, value=value, timestamp=self._timestamp
)
@property
def limits(self):
return self._limits
def check_value(self, value, **kwargs):
""" Check if value falls within limits"""
"""Check if value falls within limits"""
lol = self.limits[0]
if lol is not None:
if value < lol:
@@ -70,9 +73,9 @@ class SmarGonSignal(Signal):
class SmarGonSignalRO(Signal):
""" Small helper class for read-only parameters PVs from SmarGon.
"""Small helper class for read-only parameters PVs from SmarGon.
TODO: Add monitoring
TODO: Add monitoring
"""
def __init__(self, *args, read_addr="readbackSCS", **kwargs):
@@ -111,8 +114,9 @@ class SmarGonAxis(Device):
This class controls the SmarGon goniometer via the REST interface.
"""
# Status attributes
sg_url = Component(Signal, kind=Kind.config, metadata={'write_access': False})
sg_url = Component(Signal, kind=Kind.config, metadata={"write_access": False})
corr = Component(SmarGonSignalRO, read_addr="corr_type", kind=Kind.config)
mode = Component(SmarGonSignalRO, read_addr="mode", kind=Kind.config)
@@ -134,17 +138,25 @@ class SmarGonAxis(Device):
parent=None,
device_manager=None,
sg_url: str = "http://x06da-smargopolo.psi.ch:3000",
low_limit = None,
high_limit = None,
low_limit=None,
high_limit=None,
**kwargs,
) -> None:
self.__class__.__dict__['readback'].kwargs['read_addr'] = f"readback{prefix}"
self.__class__.__dict__['setpoint'].kwargs['write_addr'] = f"target{prefix}"
self.__class__.__dict__['setpoint'].kwargs['low_limit'] = low_limit
self.__class__.__dict__['setpoint'].kwargs['high_limit'] = high_limit
self.__class__.__dict__["readback"].kwargs["read_addr"] = f"readback{prefix}"
self.__class__.__dict__["setpoint"].kwargs["write_addr"] = f"target{prefix}"
self.__class__.__dict__["setpoint"].kwargs["low_limit"] = low_limit
self.__class__.__dict__["setpoint"].kwargs["high_limit"] = high_limit
# super().__init__(prefix=prefix, name=name, kind=kind, read_attrs=read_attrs, configuration_attrs=configuration_attrs, parent=parent, device_manager=device_manager, **kwargs)
super().__init__(prefix=prefix, name=name, kind=kind, read_attrs=read_attrs, configuration_attrs=configuration_attrs, parent=parent, **kwargs)
super().__init__(
prefix=prefix,
name=name,
kind=kind,
read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
parent=parent,
**kwargs,
)
self.sg_url._metadata["write_access"] = False
self.sg_url.set(sg_url, force=True).wait()
@@ -160,7 +172,9 @@ class SmarGonAxis(Device):
cmd = f"{self.sg_url.get()}/{address}"
r = requests.get(cmd, timeout=1)
if not r.ok:
raise RuntimeError(f"[{self.name}] Error getting {address}; server returned {r.status_code} => {r.reason}")
raise RuntimeError(
f"[{self.name}] Error getting {address}; server returned {r.status_code} => {r.reason}"
)
return r.json()
def _go_n_put(self, address, **kwargs):
@@ -168,19 +182,22 @@ class SmarGonAxis(Device):
cmd = f"{self.sg_url.get()}/{address}"
r = requests.put(cmd, timeout=1)
if not r.ok:
raise RuntimeError(f"[{self.name}] Error putting {address}; server returned {r.status_code} => {r.reason}")
raise RuntimeError(
f"[{self.name}] Error putting {address}; server returned {r.status_code} => {r.reason}"
)
return r.json()
if __name__ == "__main__":
shx = SmarGonAxis(prefix="SCS", name="shx", sg_url="http://x06da-smargopolo.psi.ch:3000")
shy = SmarGonAxis(prefix="SCS", name="shy", sg_url="http://x06da-smargopolo.psi.ch:3000")
shz = SmarGonAxis(prefix="SCS", name="shz", low_limit=10, high_limit=22, sg_url="http://x06da-smargopolo.psi.ch:3000")
shz = SmarGonAxis(
prefix="SCS",
name="shz",
low_limit=10,
high_limit=22,
sg_url="http://x06da-smargopolo.psi.ch:3000",
)
shx.wait_for_connection()
shy.wait_for_connection()
shz.wait_for_connection()

View File

@@ -3,6 +3,7 @@
Scan primitives for standard BEC scans at the PX beamlines at SLS.
Theese scans define the event model and can be called from higher levels.
"""
import numpy as np
from bec_lib import bec_logger
@@ -11,9 +12,9 @@ from bec_server.scan_server.scans import AsyncFlyScanBase, ScanArgType, ScanBase
logger = bec_logger.logger
class AbrCmd:
""" Valid Aerotech ABR scan commands from the AeroBasic files"""
"""Valid Aerotech ABR scan commands from the AeroBasic files"""
NONE = 0
RASTER_SCAN_SIMPLE = 1
MEASURE_STANDARD = 2
@@ -38,9 +39,8 @@ class AbrCmd:
# SCAN_SASTT_V3 = 21
class AerotechFlyscanBase(AsyncFlyScanBase):
""" Base class for MX flyscans
"""Base class for MX flyscans
Low-level base class for standard scans at the PX beamlines at SLS. Theese
scans use the A3200 rotation stage and the actual experiment is performed
@@ -50,26 +50,27 @@ class AerotechFlyscanBase(AsyncFlyScanBase):
configurations in child classes or pass it as command line parameters.
IMPORTANT: The AeroBasic scripts take care of the PSO configuration.
Parameters:
-----------
abr_complete : bool
Wait for the launched ABR task to complete.
"""
scan_type = "fly"
scan_report_hint = "table"
arg_input = {}
arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None}
# Aerotech stage config
abr_raster_reset =False
abr_raster_reset = False
abr_complete = False
abr_timeout = None
pointID = 0
num_pos = 0
def __init__(self, *args, parameter: dict = None, **kwargs):
""" Just set num_pos=0 to avoid hanging and override defaults if explicitly set from
"""Just set num_pos=0 to avoid hanging and override defaults if explicitly set from
parameters.
"""
super().__init__(parameter=parameter, **kwargs)
@@ -81,14 +82,14 @@ class AerotechFlyscanBase(AsyncFlyScanBase):
self.abr_timeout = self.caller_kwargs.get("abr_timeout")
def pre_scan(self):
""" Mostly just checking if ABR stage is ok..."""
"""Mostly just checking if ABR stage is ok..."""
# TODO: Move roughly to start position???
# ABR status checking
stat = yield from self.stubs.send_rpc_and_wait("abr","status.get")
stat = yield from self.stubs.send_rpc_and_wait("abr", "status.get")
if stat not in (0, "OK"):
raise RuntimeError("Aerotech ABR seems to be in error state {stat}, please reset")
task = yield from self.stubs.send_rpc_and_wait("abr","task1.get")
task = yield from self.stubs.send_rpc_and_wait("abr", "task1.get")
# From what I got values are copied to local vars at the start of scan,
# so only kickoff should be forbidden.
if task not in (1, "OK"):
@@ -106,8 +107,7 @@ class AerotechFlyscanBase(AsyncFlyScanBase):
# return super().stage()
def scan_core(self):
""" The actual scan logic comes here.
"""
"""The actual scan logic comes here."""
# Kick off the run
yield from self.stubs.send_rpc_and_wait("abr", "bluekickoff")
logger.info("Measurement launched on the ABR stage...")
@@ -127,9 +127,8 @@ class AerotechFlyscanBase(AsyncFlyScanBase):
return super().cleanup()
class MeasureStandardWedge(AerotechFlyscanBase):
""" Standard wedge scan using the OMEGA motor
"""Standard wedge scan using the OMEGA motor
Measure an absolute continous line scan from `start` to `start` + `range`
during `move_time` on the Omega axis with PSO output.
@@ -148,22 +147,22 @@ class MeasureStandardWedge(AerotechFlyscanBase):
range : (float, float)
Scan range of the axis.
move_time : (float)
Total travel time for the movement [s].
Total travel time for the movement [s].
ready_rate : float, optional
No clue what is this... (default=500)
"""
scan_name = "standardscan"
required_kwargs = ["start", "range", "move_time"]
class MeasureVerticalLine(AerotechFlyscanBase):
""" Vertical line scan using the GMY motor
"""Vertical line scan using the GMY motor
Simple relative continous line scan that records a single vertical line
with PSO output. There's no actual stepping, it's only used for velocity
calculation.
The scan itself is executed by the scan service running on the Aerotech
controller. Ophyd just configures, launches it and waits for completion.
@@ -180,19 +179,19 @@ class MeasureVerticalLine(AerotechFlyscanBase):
exp_time : float
Eeffective exposure time per step [s]
"""
scan_name = "vlinescan"
required_kwargs = ["exp_time", "range", "steps"]
class MeasureRasterSimple(AerotechFlyscanBase):
""" Simple raster scan
"""Simple raster scan
Measure a simplified relative zigzag raster scan in the X-Y plane.
The scan is relative assumes the goniometer is currently at the CENTER of
the first cell (i.e. TOP-LEFT). Each line is executed as a single continous
movement, i.e. there's no actual stepping in the X direction.
The scan itself is executed by the scan service running on the Aerotech
controller. Ophyd just configures, launches it and waits for completion.
@@ -213,16 +212,15 @@ class MeasureRasterSimple(AerotechFlyscanBase):
steps_y : int
Number of scan steps in Y (slow).
"""
scan_name = "rasterscan"
required_kwargs = ["exp_time", "range_x", "range_y", "steps_x", "steps_y"]
class MeasureScreening(AerotechFlyscanBase):
""" Sample screening scan
Sample screening scan that scans intervals on the rotation axis taking
"""Sample screening scan
Sample screening scan that scans intervals on the rotation axis taking
1 image/interval. This makes sure that we hit diffraction peaks if there
are any crystals.
@@ -242,11 +240,12 @@ class MeasureScreening(AerotechFlyscanBase):
steps : int
Number of blurred intervals.
exp_time : float
Exposure time per blurred interval [s].
Exposure time per blurred interval [s].
oscrange : float
Motion blurring of each interval [deg]
delta : float
Safety margin for sub-range movements (default: 0.5) [deg].
"""
scan_name = "screeningscan"
required_kwargs = ["start", "range", "steps", "exp_time", "oscrange"]