Seuence scan via Ophyd works

This commit is contained in:
mohacsi_i 2024-02-19 15:55:32 +01:00
parent 853d621042
commit 9378d9f74c
3 changed files with 221 additions and 107 deletions

View File

@ -16,13 +16,56 @@ es1_aa1:
es1_aa1Tasks:
description: 'AA1 task management'
description: 'AA1 task management interface'
deviceClass: epics:devices:aa1Tasks
deviceConfig: {prefix: 'X02DA-ES1-SMP1:TASK:'}
onFailure: buffer
enabled: true
readoutPriority: monitored
es1_aa1GlobalVar:
description: 'AA1 global variables interface'
deviceClass: epics:devices:aa1GlobalVariables
deviceConfig: {prefix: 'X02DA-ES1-SMP1:VAR:'}
onFailure: buffer
enabled: true
readoutPriority: monitored
es1_aa1GlobalMon:
description: 'AA1 polled global variable interface'
deviceClass: epics:devices:aa1GlobalVariableBindings
deviceConfig: {prefix: 'X02DA-ES1-SMP1:VAR:'}
onFailure: buffer
enabled: true
readoutPriority: monitored
es1_psod:
description: 'AA1 PSO output interface'
deviceClass: epics:devices:aa1AxisPsoDistance
deviceConfig: {prefix: 'X02DA-ES1-SMP1:ROTY:PSO:'}
onFailure: buffer
enabled: true
readoutPriority: monitored
es1_ddaq:
description: 'AA1 drive data collection interface'
deviceClass: epics:devices:aa1AxisDriveDataCollection
deviceConfig: {prefix: 'X02DA-ES1-SMP1:ROTY:DDC:'}
onFailure: buffer
enabled: true
readoutPriority: monitored

View File

@ -1,5 +1,5 @@
from ophyd import Device, Component, EpicsMotor, EpicsSignal, EpicsSignalRO, Kind, DerivedSignal
from ophyd.status import Status, SubscriptionStatus, StatusBase
from ophyd.status import Status, SubscriptionStatus, StatusBase, DeviceStatus
from time import sleep
import warnings
import numpy as np
@ -14,27 +14,60 @@ from typing import Union
from collections import OrderedDict
class EpicsMotorX(Device):
mot = Component(EpicsMotor, "", name='mot')
def move(position):
self.mot.move(position, wait=True)
def goto(position, wait=True):
super().move(position, wait)
def sleep(time_ms):
sleep(time_ms)
def epicsCharArray2String(raw: bytes) -> str:
""" Ophyd might convert strings to uint16 vector..."""
if isinstance(raw, np.ndarray):
raw = raw[:-1] if raw[-1]==0 else raw
sReply = ''.join(str(s, encoding='ASCII') for s in raw.astype(np.uint8))
print(f"Raw reply: {raw}\nConverted reply: {sReply}")
#print(f"Raw reply: {raw}\nConverted reply: {sReply}")
return sReply
else:
return raw
class EpicsSignalPassive(Device):
value = Component(EpicsSignalRO, "", kind=Kind.hinted)
value = Component(EpicsSignalRO, "", kind=Kind.omitted)
proc = Component(EpicsSignal, ".PROC", kind=Kind.omitted, put_complete=True)
def get(self):
self.proc.set(1).wait()
return self.value.get()
class EpicsPassiveRO(EpicsSignalRO):
"""Special helper class to work around a bug in ophyd (caproto backend)
that reads CHAR array strigs as uint16 arrays.
"""
def __init__(self, read_pv, *, string=False, name=None, **kwargs):
super().__init__(read_pv, string=string, name=name, **kwargs)
self._proc = EpicsSignal(read_pv+".PROC", kind=Kind.omitted, put_complete=True)
def wait_for_connection(self, *args, **kwargs):
super().wait_for_connection(*args, **kwargs)
self._proc.wait_for_connection(*args, **kwargs)
def get(self, *args, **kwargs):
self._proc.set(1).wait()
return super().get(*args, **kwargs)
@property
def value(self):
return super().value
class EpicsStringPassiveRO(EpicsSignalRO):
"""Special helper class to work around a bug in ophyd (caproto backend)
that reads CHAR array strigs as uint16 arrays.
@ -118,8 +151,8 @@ class aa1Controller(Device):
apiversion = Component(EpicsSignalRO, "API_VERSION", kind=Kind.config)
axiscount = Component(EpicsSignalRO, "AXISCOUNT", kind=Kind.config)
taskcount = Component(EpicsSignalRO, "TASKCOUNT", kind=Kind.config)
fastpoll = Component(EpicsSignalRO, "RUNNING", auto_monitor=True, kind=Kind.hinted)
slowpoll = Component(EpicsSignalRO, "RUNNING", auto_monitor=True, kind=Kind.hinted)
fastpoll = Component(EpicsSignalRO, "POLLTIME", auto_monitor=True, kind=Kind.hinted)
slowpoll = Component(EpicsSignalRO, "DRVPOLLTIME", auto_monitor=True, kind=Kind.hinted)
def __init__(self, prefix="", *, name, kind=None, read_attrs=None, configuration_attrs=None, parent=None, **kwargs):
super().__init__(prefix=prefix, name=name, kind=kind, read_attrs=read_attrs, configuration_attrs=configuration_attrs, parent=parent, **kwargs)
@ -139,27 +172,20 @@ class aa1Tasks(Device):
warnStatus = Component(EpicsSignalRO, "WARNW", auto_monitor=True, kind=Kind.hinted)
taskIndex = Component(EpicsSignal, "TASKIDX", kind=Kind.config, put_complete=True)
switch = Component(EpicsSignal, "SWITCH", kind=Kind.config, put_complete=True)
_execute = Component(EpicsSignal, "EXECUTE", kind=Kind.config, put_complete=True)
_execute = Component(EpicsSignal, "EXECUTE", string=True, kind=Kind.config, put_complete=True)
_executeMode = Component(EpicsSignal, "EXECUTE-MODE", kind=Kind.config, put_complete=True)
_executeReply = Component(EpicsSignalRO, "EXECUTE_RBV", auto_monitor=True)
_executeReply = Component(EpicsSignalRO, "EXECUTE_RBV", string=True, auto_monitor=True)
fileName = Component(EpicsSignal, "FILENAME", kind=Kind.config, put_complete=True)
_fileList = Component(EpicsSignalPassive, "FILELIST", kind=Kind.config)
_fileRead = Component(EpicsSignalPassive, "FILEREAD", kind=Kind.config)
_fileWrite = Component(EpicsSignal, "FILEWRITE", kind=Kind.config, put_complete=True)
fileName = Component(EpicsSignal, "FILENAME", string=True, kind=Kind.omitted, put_complete=True)
_fileRead = Component(EpicsPassiveRO, "FILEREAD", string=True, kind=Kind.omitted)
_fileWrite = Component(EpicsSignal, "FILEWRITE", string=True, kind=Kind.omitted, put_complete=True)
def __init__(self, prefix="", *, name, kind=None, read_attrs=None, configuration_attrs=None, parent=None, **kwargs):
""" __init__ MUST have a full argument list"""
super().__init__(prefix=prefix, name=name, kind=kind, read_attrs=read_attrs, configuration_attrs=configuration_attrs, parent=parent, **kwargs)
self._currentTaskMonitor = None
self._textToExecute = None
def listFiles(self) -> list:
""" List all available files on the controller """
# Have to use CHAR array due to EPICS LSI bug...
namesbytes = self._fileList.get().astype(np.uint8).tobytes()
nameslist = namesbytes.decode('ascii').split('\t')
return nameslist
self._isConfigured = False
def readFile(self, filename: str) -> str:
""" Read a file from the controller """
@ -181,14 +207,14 @@ class aa1Tasks(Device):
""" Run a script file that either exists, or is newly created and compiled"""
self.configure(text=filetext, filename=filename, taskIndex=taskIndex)
self.trigger(settle_time=settle_time).wait()
self.trigger().wait()
def execute(self, text: str, taskIndex: int=3, mode: str=0, settle_time=0.5):
""" Run a short text command on the Automation1 controller"""
print(f"Executing program on task: {taskIndex}")
self.configure(text=text, taskIndex=taskIndex, mode=mode)
self.trigger(settle_time=settle_time).wait()
self.trigger().wait()
if mode in [0, "None", None]:
return None
@ -196,22 +222,24 @@ class aa1Tasks(Device):
raw = self._executeReply.get()
return epicsCharArray2String(raw)
def configure(self, text: str=None, filename: str=None, taskIndex: int=4, settle_time=None, **kwargs):
""" Interface for configuration """
def configure(self, d: dict={}) -> tuple:
""" Configuration interface for flying
"""
# Unrolling the configuration dict
text = str(d['text']) if 'text' in d else None
filename = str(d['filename']) if 'filename' in d else None
taskIndex = int(d['taskIndex']) if 'taskIndex' in d else 4
settle_time = float(d['settle_time']) if 'settle_time' in d else None
mode = d['mode'] if 'mode' in d else None
# Validation
if taskIndex < 0 or taskIndex > 31:
if taskIndex < 1 or taskIndex > 31:
raise RuntimeError(f"Invalid task index: {taskIndex}")
if (text is None) and (filename is None):
raise RuntimeError("Task execution requires either AeroScript text or filename")
if 'mode' in kwargs:
if kwargs['mode'] not in [0, 1, 2, 3, 4, "None", "Axis", "Int", "Double", "String", None]:
raise RuntimeError(f"Unknown execution mode: {kwargs['mode']}")
if kwargs['mode'] is None:
kwargs['mode'] = "None"
else:
kwargs['mode'] = "None"
# common operations
# Common operations
old = self.read_configuration()
self.taskIndex.set(taskIndex).wait()
self._textToExecute = None
#self._currentTaskMonitor = aa1TaskState()
@ -236,42 +264,60 @@ class aa1Tasks(Device):
self._textToExecute = None
else:
raise RuntimeError("Unsupported filename-text combo")
self._isConfigured = True
new = self.read_configuration()
return (old, new)
##########################################################################
# Bluesky stepper interface
def stage(self) -> None:
""" Default staging """
print("Staging")
super().stage()
def unstage(self) -> None:
""" Default unstaging """
print("Unstaging")
super().unstage()
def trigger(self, settle_time=0.2) -> StatusBase:
def trigger(self, settle_time=0.2) -> Status:
""" Execute the script on the configured task"""
if self._textToExecute is not None:
status = self._execute.set(self._textToExecute, settle_time=settle_time)
print("Triggering")
if self._isConfigured:
if self._textToExecute is not None:
status = self._execute.set(self._textToExecute, settle_time=settle_time)
else:
status = self.switch.set("Run", settle_time=settle_time)
else:
status = self.switch.set("Run", settle_time=settle_time)
status = Status()
status.set_finished()
return status
def stop(self):
""" Stop the currently selected task """
print(type(self.switch))
self.switch.set("Stop").wait()
##########################################################################
# Flyer interface
def kickoff(self) -> StatusBase:
def kickoff(self) -> DeviceStatus:
""" Execute the script on the configured task"""
if self._textToExecute is not None:
status = self._execute.set(self._textToExecute, settle_time=0.5)
print("Kickoff")
if self._isConfigured:
if self._textToExecute is not None:
status = self._execute.set(self._textToExecute, settle_time=0.5)
else:
status = self.switch.set("Run", settle_time=0.1)
else:
status = self.switch.set("Run", settle_time=0.1)
status = DeviceStatus(self)
status.set_finished()
return status
def complete(self) -> StatusBase:
def complete(self) -> DeviceStatus:
""" Execute the script on the configured task"""
print("Complete")
#return self._currentTaskMonitor.complete()
status = StatusBase()
status = DeviceStatus(self)
status.set_finished()
return status
@ -281,6 +327,7 @@ class aa1Tasks(Device):
return {self.name: dd}
def collect(self) -> OrderedDict:
print("Collecting")
ret = OrderedDict()
ret["timestamps"] = {"success": time.time()}
ret["data"] = {"success": 1}
@ -305,7 +352,6 @@ class aa1TaskState(Device):
def notRunning(*args, old_value, value, timestamp, **kwargs):
nonlocal timestamp_
result = False if (timestamp_== 0) else (value not in ["Running", 4])
print(f"Old {old_value}\tNew: {value}\tResult: {result}")
timestamp_ = timestamp
return result
@ -313,8 +359,8 @@ class aa1TaskState(Device):
status = SubscriptionStatus(self.status, notRunning, settle_time=0.5)
return status
def kickoff(self) -> StatusBase:
status = StatusBase()
def kickoff(self) -> DeviceStatus:
status = DeviceStatus(self)
status.set_finished()
return status
@ -331,7 +377,7 @@ class aa1TaskState(Device):
class aa1DataAcquisition(Device):
""" Controller Data Acquisition
""" Controller Data Acquisition - DONT USE at Tomcat
This class implements the controller data collection feature of the
Automation1 controller. This feature logs various inputs at a
@ -466,20 +512,20 @@ class aa1GlobalVariables(Device):
integer_addr = Component(EpicsSignal, "INT-ADDR", kind=Kind.omitted, put_complete=True)
integer_size = Component(EpicsSignal, "INT-SIZE", kind=Kind.omitted, put_complete=True)
integer = Component(EpicsSignal, "INT", kind=Kind.omitted, put_complete=True)
integer_rb = Component(EpicsSignalPassive, "INT-RBV", kind=Kind.omitted)
integer_rb = Component(EpicsPassiveRO, "INT-RBV", kind=Kind.omitted)
integerarr = Component(EpicsSignal, "INTARR", kind=Kind.omitted, put_complete=True)
integerarr_rb = Component(EpicsSignalPassive, "INTARR-RBV", kind=Kind.omitted)
integerarr_rb = Component(EpicsPassiveRO, "INTARR-RBV", kind=Kind.omitted)
real_addr = Component(EpicsSignal, "REAL-ADDR", kind=Kind.omitted, put_complete=True)
real_size = Component(EpicsSignal, "REAL-SIZE", kind=Kind.omitted, put_complete=True)
real = Component(EpicsSignal, "REAL", kind=Kind.omitted, put_complete=True)
real_rb = Component(EpicsSignalPassive, "REAL-RBV", kind=Kind.omitted)
real_rb = Component(EpicsPassiveRO, "REAL-RBV", kind=Kind.omitted)
realarr = Component(EpicsSignal, "REALARR", kind=Kind.omitted, put_complete=True)
realarr_rb = Component(EpicsSignalPassive, "REALARR-RBV", kind=Kind.omitted)
realarr_rb = Component(EpicsPassiveRO, "REALARR-RBV", kind=Kind.omitted)
string_addr = Component(EpicsSignal, "STRING-ADDR", kind=Kind.omitted, put_complete=True)
string = Component(EpicsSignal, "STRING", kind=Kind.omitted, put_complete=True)
string_rb = Component(EpicsStringPassiveRO, "STRING-RBV", kind=Kind.omitted)
string = Component(EpicsSignal, "STRING", string=True, kind=Kind.omitted, put_complete=True)
string_rb = Component(EpicsPassiveRO, "STRING-RBV", string=True, kind=Kind.omitted)
def readInt(self, address: int, size: int=None) -> int:
""" Read a 64-bit integer global variable """
@ -595,12 +641,11 @@ class aa1GlobalVariableBindings(Device):
float18 = Component(EpicsSignal, "REAL18_RBV", write_pv="REAL18", put_complete=True, auto_monitor=True, name="float18", kind=Kind.hinted)
float19 = Component(EpicsSignal, "REAL19_RBV", write_pv="REAL19", put_complete=True, auto_monitor=True, name="float19", kind=Kind.hinted)
_str0 = Component(EpicsStringRO, "STR0_RBV", auto_monitor=True, name="str0_raw", kind=Kind.omitted)
str0 = Component(DerivedSignal, "_str0", inverse=epicsCharArray2String, auto_monitor=True, name="str0", kind=Kind.hinted)
str1 = Component(EpicsStringRO, "STR1_RBV", auto_monitor=True, name="str1", kind=Kind.hinted)
str4 = Component(EpicsString, "STR4_RBV", put_complete=True, auto_monitor=True, write_pv="STR4", name="str4", kind=Kind.hinted)
str5 = Component(EpicsString, "STR5_RBV", put_complete=True, auto_monitor=True, write_pv="STR5", name="str4", kind=Kind.hinted)
# BEC LiveTable crashes on non-numeric values
str0 = Component(EpicsSignalRO, "STR0_RBV", auto_monitor=True, string=True, name="str0", kind=Kind.config)
str1 = Component(EpicsSignalRO, "STR1_RBV", auto_monitor=True, string=True, name="str1", kind=Kind.config)
str4 = Component(EpicsString, "STR4_RBV", put_complete=True, string=True, auto_monitor=True, write_pv="STR4", name="str4", kind=Kind.config)
str5 = Component(EpicsString, "STR5_RBV", put_complete=True, string=True, auto_monitor=True, write_pv="STR5", name="str5", kind=Kind.config)
@ -735,33 +780,32 @@ class aa1AxisPsoDistance(aa1AxisPsoBase):
refer to Automation1's online manual.
"""
def __init__(self, *args, **kwargs):
""" Member declarations in init"""
def __init__(self, prefix="", *, name, kind=None, read_attrs=None, configuration_attrs=None, parent=None, **kwargs):
""" __init__ MUST have a full argument list"""
super().__init__(prefix=prefix, name=name, kind=kind, read_attrs=read_attrs, configuration_attrs=configuration_attrs, parent=parent, **kwargs)
self._Vdistance = 3.141592
super().__init__(*args, **kwargs)
# ########################################################################
# PSO high level interface
def configure(self, distance: Union[float, np.ndarray, list, tuple], wmode: str,
t_pulse: float=2000, w_pulse: float=5000, n_pulse: int=1,
posInput: int=None, pinOutput: int=None, **argv) -> None:
def configure(self, d: dict={}) -> tuple:
""" Simplified configuration interface to access the most common
functionality for distance mode PSO.
:param distance: The trigger distance or the array of distances between subsequent points.
:param wmode: Waveform mode configuration, usually pulsed/toggled.
"""
distance = d['distance']
wmode = str(d['wmode'])
t_pulse = float(d['t_pulse']) if 't_pulse' in d else None
w_pulse = float(d['w_pulse']) if 'w_pulse' in d else None
n_pulse = float(d['n_pulse']) if 'n_pulse' in d else None
# Validate input parameters
if wmode not in ["pulse", "pulsed", "toggle", "toggled"]:
raise RuntimeError(f"Unsupported distace triggering mode: {wmode}")
# Set the position data source and output pin
if posInput is not None:
self.posInput.set(posInput).wait()
if pinOutput is not None:
self.pinOutput.set(pinOutput).wait()
old = self.read_configuration()
# Configure distance generator (also resets counter to 0)
self._distanceValue = distance
if isinstance(distance, (float, int)):
@ -784,9 +828,12 @@ class aa1AxisPsoDistance(aa1AxisPsoBase):
self.waveEnable.set("On").wait()
self.waveMode.set("Pulse").wait()
# Setting pulse shape
self.pulseWindow.set(w_pulse).wait()
self.pulseOnTime.set(t_pulse).wait()
self.pulseCount.set(n_pulse).wait()
if w_pulse is not None:
self.pulseWindow.set(w_pulse).wait()
if t_pulse is not None:
self.pulseOnTime.set(t_pulse).wait()
if n_pulse is not None:
self.pulseCount.set(n_pulse).wait()
# Commiting configuration
self.pulseApply.set(1).wait()
# Enabling PSO waveform outputs
@ -801,11 +848,14 @@ class aa1AxisPsoDistance(aa1AxisPsoBase):
# Set PSO output data source
self.outSource.set("Waveform").wait()
new = self.read_configuration()
return (old, new)
# ########################################################################
# Bluesky step scan interface
def stage(self, settle_time=None):
self.dstEventsEna.set("On").wait()
if isinstance(self._distanceValue, (np.ndarray, list, tuple)):
if hasattr(self, "_distanceValue") and isinstance(self._distanceValue, (np.ndarray, list, tuple)):
self.dstArrayRearm.set(1).wait()
self.dstCounterEna.set("On").wait()
if settle_time is not None:
@ -822,21 +872,21 @@ class aa1AxisPsoDistance(aa1AxisPsoBase):
return super().unstage()
# ########################################################################
# Bluesky flyer interface
def kickoff(self) -> StatusBase:
def kickoff(self) -> DeviceStatus:
# Rearm the configured array
if hasattr(self, "_distanceValue") and isinstance(self._distanceValue, (np.ndarray, list, tuple)):
self.dstArrayRearm.set(1).wait()
# Start monitoring the counters
self.dstEventsEna.set("On").wait()
self.dstCounterEna.set("On").wait()
status = StatusBase()
status = DeviceStatus(self)
status.set_finished()
return status
def complete(self) -> StatusBase:
def complete(self) -> DeviceStatus:
""" Bluesky flyer interface"""
# Array mode waits until the buffer is empty
if isinstance(self._distanceValue, (np.ndarray, list, tuple)):
if hasattr(self, "_distanceValue") and isinstance(self._distanceValue, (np.ndarray, list, tuple)):
# Define wait until the busy flag goes down (excluding initial update)
timestamp_ = 0
def notRunning(*args, old_value, value, timestamp, **kwargs):
@ -850,7 +900,7 @@ class aa1AxisPsoDistance(aa1AxisPsoBase):
status = SubscriptionStatus(self.status, notRunning, settle_time=0.5)
else:
# In distance trigger mode there's no specific goal
status = StatusBase()
status = DeviceStatus(self)
status.set_finished()
return status
@ -887,34 +937,35 @@ class aa1AxisPsoWindow(aa1AxisPsoBase):
For a more detailed description of additional signals and masking plase
refer to Automation1's online manual.
"""
def __init__(self, *args, **kwargs):
""" Member declarations in init"""
def __init__(self, prefix="", *, name, kind=None, read_attrs=None, configuration_attrs=None, parent=None, **kwargs):
""" __init__ MUST have a full argument list"""
super().__init__(prefix=prefix, name=name, kind=kind, read_attrs=read_attrs, configuration_attrs=configuration_attrs, parent=parent, **kwargs)
self._mode = "output"
self._eventMode = "Enter"
super().__init__(*args, **kwargs)
# ########################################################################
# PSO high level interface
def configure(self, bounds: Union[np.ndarray, list, tuple], wmode: str, emode: str="Enter",
t_pulse: float=2000, w_pulse: float=5000, n_pulse: int=1,
posInput: int=None, pinOutput: int=None, **argv) -> None:
def configure(self, d: dict={}) -> tuple:
""" Simplified configuration interface to access the most common
functionality for distance mode PSO.
:param distance: The trigger distance or the array of distances between subsequent points.
:param wmode: Waveform mode configuration, usually output/pulsed/toggled.
"""
bounds = d['distance']
wmode = str(d['wmode'])
emode = str(d['emode'])
t_pulse = float(d['t_pulse']) if 't_pulse' in d else None
w_pulse = float(d['w_pulse']) if 'w_pulse' in d else None
n_pulse = float(d['n_pulse']) if 'n_pulse' in d else None
# Validate input parameters
if wmode not in ["pulse", "pulsed", "toggle", "toggled", "output", "flag"]:
raise RuntimeError(f"Unsupported window triggering mode: {wmode}")
self._mode = wmode
self._eventMode = emode
# Set the position data source and output pin
if posInput is not None:
self.posInput.set(posInput).wait()
if pinOutput is not None:
self.outPin.set(pinOutput).wait()
old = self.read_configuration()
# Configure the window module
# Set the window ranges (MUST be in start position)
@ -959,6 +1010,9 @@ class aa1AxisPsoWindow(aa1AxisPsoBase):
elif wmode in ["output", "flag"]:
self.outSource.set("Window").wait()
new = self.read_configuration()
return (old, new)
def stage(self, settle_time=None):
if self.outSource.get() in ["Window", 2]:
self.winOutput.set("On").wait()
@ -1017,16 +1071,22 @@ class aa1AxisDriveDataCollection(Device):
_buffer0 = Component(EpicsSignalRO, "BUFFER0", auto_monitor=True, kind=Kind.hinted)
_buffer1 = Component(EpicsSignalRO, "BUFFER1", auto_monitor=True, kind=Kind.hinted)
def configure(self, npoints,
trigger: int=DriveDataCaptureTrigger.PsoOutput,
source0: int=DriveDataCaptureInput.PrimaryFeedback,
source1: int=DriveDataCaptureInput.PositionCommand):
def configure(self, d: dict={}) -> tuple:
npoints = int(d['npoints'])
trigger = int(d['trigger']) if 'trigger' in d else DriveDataCaptureTrigger.PsoOutput
source0 = int(d['source0']) if 'source0' in d else DriveDataCaptureInput.PrimaryFeedback
source1 = int(d['source1']) if 'source1' in d else DriveDataCaptureInput.PositionCommand
old = self.read_configuration()
self._input0.set(source0).wait()
self._input1.set(source1).wait()
self._trigger.set(trigger).wait()
# This allocates the memory...
self.npoints.set(npoints).wait()
new = self.read_configuration()
return (old, new)
# Bluesky step scanning interface
def stage(self, settle_time=0.1):
@ -1038,15 +1098,16 @@ class aa1AxisDriveDataCollection(Device):
def unstage(self, settle_time=0.1):
self._switch.set("Stop", settle_time=settle_time).wait()
super().unstage()
print(f"Recorded samples: {self.nsamples_rbv.value}")
# Bluesky flyer interface
def kickoff(self, settle_time=0.1) -> Status:
status = self._switch.set("Start", settle_time=settle_time)
return status
def complete(self, settle_time=0.1) -> Status:
def complete(self, settle_time=0.1) -> DeviceStatus:
""" DDC just reads back whatever is available in the buffers"""
status = Status(settle_time=settle_time)
status = DeviceStatus(self, settle_time=settle_time)
status.set_finished()
return status
@ -1100,11 +1161,22 @@ class aa1AxisDriveDataCollection(Device):
# Automatically start simulation if directly invoked
if __name__ == "__main__":
AA1_IOC_NAME = "X02DA-ES1-SMP1"
AA1_AXIS_NAME = "ROTY"
# Drive data collection
tcDdc = aa1AxisDriveDataCollection("X02DA-ES1-SMP1:ROTY:DDC:", name="tcddc")
tcDdc.wait_for_connection()
task = aa1Tasks(AA1_IOC_NAME+":TASK:", name="tsk")
task.wait_for_connection()
task.describe()
ddc = aa1AxisDriveDataCollection("X02DA-ES1-SMP1:ROTY:DDC:", name="ddc")
ddc.wait_for_connection()
globb = aa1GlobalVariableBindings(AA1_IOC_NAME+":VAR:", name="globb")
globb.wait_for_connection()
globb.describe()
mot = EpicsMotor(AA1_IOC_NAME+ ":" + AA1_AXIS_NAME, name="x")
mot.wait_for_connection()

View File

@ -29,5 +29,4 @@ from .specMotors import (
PmMonoBender,
)
from .SpmBase import SpmBase
from .XbpmBase import XbpmBase, XbpmCsaxsOp
from .AerotechAutomation1 import aa1Controller, aa1Tasks
from .AerotechAutomation1 import aa1Controller, aa1Tasks, aa1GlobalVariables, aa1GlobalVariableBindings, aa1AxisPsoDistance, aa1AxisDriveDataCollection, EpicsMotorX