Daily commit mit some patching

This commit is contained in:
Unknown MX Person
2024-09-19 17:30:13 +02:00
parent 2563471ac8
commit 14ca9bd74a
5 changed files with 96 additions and 59 deletions

View File

@@ -15,6 +15,7 @@ slh_trxw:
readOnly: false
softwareTrigger: false
fi1_try:
description: Beam attenuator in OP
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-OP-FI1:TRY'}
onFailure: buffer
@@ -193,6 +194,7 @@ vfm_trydw:
readOnly: false
softwareTrigger: false
vfm_pitch:
description: KB mirror vertical steering
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES1-VFM:PITCH'}
onFailure: buffer
@@ -273,6 +275,7 @@ hfm_trydr:
readOnly: false
softwareTrigger: false
hfm_pitch:
description: KB mirror horizontal steering
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES1-HFM:PITCH'}
enabled: false
@@ -313,6 +316,7 @@ hfm_try:
readOnly: false
softwareTrigger: false
xbox_diode:
description: Exposure box diode
deviceClass: ophyd.EpicsSignalRO
deviceConfig: {read_pv: 'X06DA-ES-DI1:READOUT'}
onFailure: buffer
@@ -321,6 +325,7 @@ xbox_diode:
readOnly: true
softwareTrigger: false
bstop_diode:
description: Beamstop diode
deviceClass: ophyd.EpicsSignalRO
deviceConfig: {read_pv: 'X06DA-ES-BS:READOUT'}
onFailure: buffer
@@ -329,14 +334,16 @@ bstop_diode:
readOnly: true
softwareTrigger: false
omega:
description: ABR rotation stage
deviceClass: pxiii_bec.devices.A3200Axis
deviceConfig: {prefix: 'X06DA-ES-DF1:OMEGA'}
deviceConfig: {prefix: 'X06DA-ES-DF1:OMEGA', base_pv: 'X06DA-ES'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
abr:
description: Aerotech ABR motion system
deviceClass: pxiii_bec.devices.AerotechAbrStage
deviceConfig: {prefix: X06DA-ES}
onFailure: buffer

View File

@@ -85,7 +85,7 @@ from ophyd import Component, Device, EpicsSignal, EpicsSignalRO, Kind
from ophyd.status import MoveStatus, SubscriptionStatus, DeviceStatus
from .aeroA3200 import A3200Axis, A3200RasterScanner, A3200Oscillator
from .A3200utils import A3200Axis, A3200RasterScanner, A3200Oscillator
ABR_DONE = 0
@@ -145,6 +145,8 @@ class AerotechAbrStage(Device):
an AeroBasic program on the controller and we communicate to it via 10+1
global variables.
"""
USER_ACCESS = ['reset', 'kickoff', 'complete']
taskStop = Component(EpicsSignal, "-AERO:TSK-STOP", put_complete=True, kind=Kind.omitted)
status = Component(EpicsSignal, "-AERO:STAT", put_complete=True, kind=Kind.omitted)
@@ -196,12 +198,17 @@ class AerotechAbrStage(Device):
def configure(self, d: dict) -> tuple:
"""" Configure the exposure scripts
Performs the configuration of the exposure scrips, i.e. sets the
required global variables.
Script execution at the PX beamlines is based on scripts that are always
running on the controller that execute commands when commanded by
setting a pre-defined set of global variables. This method performs the
configuration of the exposure scrips by setting the required global
variables.
Parameters
----------
scan_command:
Parameters in d: dict
---------------------
scan_command: int
The index of the desired AeroBasic program to be executed.
Usually supported values are taken from an Enum.
var_1:
var_2:
var_3:
@@ -218,8 +225,7 @@ class AerotechAbrStage(Device):
# ToDo: Check if idle before reconfiguring
# Set the corresponding global variables
if "scan_command" in d:
self.scan_command.set(d['scan_command']).wait()
self.scan_command.set(d['scan_command']).wait()
if "var_1" in d and d['var_1'] is not None:
self._var_1.set(d['var_1']).wait()
if "var_2" in d and d['var_2'] is not None:
@@ -271,11 +277,12 @@ class AerotechAbrStage(Device):
Executes the homing procedure and waits (default) until it is completed.
TODO: Return a status object to do this wwith futures and monitoring.
PARAMETERS
`wait` true / false if the routine is to wait for the homing to finish.
"""
self.omega.home.set(1, settle_time=1).wait()
poll(1.0)
if not wait:
return
while not self.omega.is_homed():

View File

@@ -4,11 +4,15 @@ Created on Tue Jun 11 11:28:38 2024
@author: mohacsi_i
"""
from ophyd import Component, PVPositioner, Device, Signal, EpicsSignal, EpicsSignalRO, Kind
import types
from ophyd import Component, PVPositioner, Device, Signal, EpicsSignal, EpicsSignalRO, Kind, PositionerBase
from ophyd.status import Status, MoveStatus, SubscriptionStatus, DeviceStatus
from bec_lib import bec_logger
logger = bec_logger.logger
ABR_DONE = 0
ABR_READY = 1
ABR_BUSY = 2
@@ -24,7 +28,13 @@ class A3200Axis(PVPositioner):
"""Positioner wrapper for motors on the Aerotech A3200 controller. As the
IOC does not provide a motor record, this class simply wraps axes into
a standard Ophyd positioner for the BEC. It also has some additional
functionality for diagnostics."""
functionality for diagnostics.
"""
USER_ACCESS = ['omove']
abr_mode_direct = Component(EpicsSignal, "-DF1:MODE-DIRECT", put_complete=True, kind=Kind.omitted)
abr_mode = Component(EpicsSignal, "-DF1:AXES-MODE", auto_monitor=True, put_complete=True, kind=Kind.omitted)
# Basic PV positioner interface
done = Component(EpicsSignalRO, "-DONE", auto_monitor=True, kind=Kind.config)
readback = Component(EpicsSignalRO, "-RBV", auto_monitor=True, kind=Kind.hinted)
@@ -58,52 +68,51 @@ class A3200Axis(PVPositioner):
vmax = Component(Signal, kind=Kind.config)
offset = Component(EpicsSignal, "-OFF", put_complete=True, kind=Kind.config)
def __init__(self, prefix="", *, name, kind=None, read_attrs=None, configuration_attrs=None, parent=None, llm=0, hlm=0, vmin=0, vmax=0, **kwargs):
def __init__(self, prefix="", *, name, base_pv="", kind=None, read_attrs=None, configuration_attrs=None, parent=None, llm=0, hlm=0, vmin=0, vmax=0, **kwargs):
""" __init__ MUST have a full argument list"""
""" Patching the parent's PVs into the axis class to check for direct/locked mode"""
if parent is None:
def maybe_add_prefix(self, instance, kw, suffix):
""" Patched not to enforce parent prefix"""
if kw in self.add_prefix:
return suffix
return suffix
self.__class__.__dict__["abr_mode"].maybe_add_prefix = types.MethodType(maybe_add_prefix, self.__class__.__dict__["abr_mode"])
self.__class__.__dict__["abr_mode_direct"].maybe_add_prefix = types.MethodType(maybe_add_prefix, self.__class__.__dict__["abr_mode_direct"])
logger.info(self.__class__.__dict__["abr_mode"].kwargs)
self.__class__.__dict__["abr_mode"].suffix = base_pv + "-DF1:AXES-MODE"
self.__class__.__dict__["abr_mode_direct"].suffix = base_pv + "-DF1:MODE-DIRECT"
super().__init__(prefix=prefix, name=name, kind=kind, read_attrs=read_attrs, configuration_attrs=configuration_attrs, parent=parent, **kwargs)
self.llm.set(llm).wait()
self.hlm.set(hlm).wait()
self.vmin.set(vmin).wait()
self.vmax.set(vmax).wait()
def move(self, position, velocity=None, wait=True, relative=False, direct=False, **kwargs) -> MoveStatus:
""" Native absolute movement on the A3200 """
# ToDo: Ensure we can stop (if a stop_signal is configured).
def move(self, position, velocity=None, wait=True, relative=False, direct=False, timeout=None, **kwargs) -> MoveStatus:
""" Native absolute/relative movement on the A3200 """
# Check if we're in direct movement mode
if self.parent is not None:
if self.parent.mode != DIRECT_MODE:
if direct:
self.parent.set_direct_mode()
else:
raise RuntimeError("ABR is not in direct mode!")
# Set velocity if provided
if velocity is not None:
self.velo.set(velocity).wait()
# Set up relative motion
if relative:
raise NotImplementedError("Relative movement is not yet supported by the ophyd device")
return super().move(position, wait=wait, **kwargs)
def rmove(self, distance, wait=True, velo=None, timeout=None, moved_cb=None) -> Status:
""" Native relative movement on the A3200 """
if self.abr_mode.value not in (DIRECT_MODE, "DIRECT"):
if direct:
self.abr_mode_direct.set(1).wait()
else:
raise RuntimeError(f"ABR axis not in direct mode: {self.abr_mode.value}")
# Before moving, ensure we can stop (if a stop_signal is configured).
if self.stop_signal is not None:
self.stop_signal.wait_for_connection()
if velo is not None:
self.velo.set(velo).wait()
# Set velocity if provided
if velocity is not None:
self.velocity.set(velocity).wait()
# Issue move command
status = super().move(position=distance, timeout=timeout, moved_cb=moved_cb)
# This is adapted from pv_positioner.py
self.check_value(position)
# Get MoveStatus from parent of parent
status = PositionerBase.move(self, position=position, timeout=timeout)
has_done = self.done is not None
if not has_done:
@@ -112,16 +121,24 @@ class A3200Axis(PVPositioner):
self._move_changed(value=moving_val)
try:
# Relative movement instead of setpoint
self.relmove.put(distance)
if relative:
# Relative movement instead of setpoint
self.relmove.put(position, wait=True)
else:
# Standard absolute movement
self.setpoint.put(position, wait=True)
if wait:
status.wait()
except KeyboardInterrupt:
self.stop()
raise
raise
return status
def omove(self, position, velocity=None, wait=True, relative=False, direct=False, **kwargs) -> MoveStatus:
""" Exposes the ophyd move command through BEC abstraction"""
return self.move(position, velocity, wait, relative, direct, **kwargs)
def rock(self, distance, counts: int, velocity=None, wait=True) -> Status:
""" Repeated single axis zigzag scan I guess PSO should be configured for this"""

View File

@@ -1,2 +1,2 @@
from .aeroA3200 import A3200Axis
from .A3200 import AerotechAbrStage
from .A3200 import AerotechAbrStage
from .A3200utils import A3200Axis

View File

@@ -23,18 +23,18 @@ class AbrCmd:
VERTICAL_LINE_SCAN = 3
SCREENING = 4
SUPER_FAST_OMEGA = 5
STILL_WEDGE = 6
STILLS = 7
REPEAT_SINGLE_OSCILLATION = 8
STILL_WEDGE = 6 # NOTE: Unused
STILLS = 7 # NOTE: Unused
REPEAT_SINGLE_OSCILLATION = 8 # NOTE: Unused
SINGLE_OSCILLATION = 9
OLD_FASHIONED = 10
OLD_FASHIONED = 10 # NOTE: Unused
RASTER_SCAN = 11
JET_ROTATION = 12
X_HELICAL = 13
X_RUNSEQ = 14
JET_ROTATION = 12 # NOTE: Unused
X_HELICAL = 13 # NOTE: Unused
X_RUNSEQ = 14 # NOTE: Unused
JUNGFRAU = 15
MSOX = 16
SLIT_SCAN = 17
MSOX = 16 # NOTE: Unused
SLIT_SCAN = 17 # NOTE: Unused
RASTER_SCAN_STILL = 18
SCAN_SASTT = 19
SCAN_SASTT_V2 = 20
@@ -87,6 +87,12 @@ class AerotechFlyscanBase(AsyncFlyScanBase):
def pre_scan(self):
# ToDo: Move roughly to start position
stat = yield from self.stubs.send_rpc_and_wait("abr","status.get")
if stat not in (0, "OK"):
raise RuntimeError("Aerotech ABR seems to be in error state {stat}, please reset")
if self.abr_reset:
yield from self.stubs.send_rpc_and_wait("abr", "reset")
# Reset the scan engine
if self.abr_reset: