Quickly commiting to safety

This commit is contained in:
gac-x05la
2024-09-03 17:05:47 +02:00
parent 015be3f281
commit 78ffcf3fa3

View File

@@ -13,69 +13,12 @@ from .AerotechAutomation1Enums import (
DriveDataCaptureTrigger,
)
class EpicsMotorX(EpicsMotor):
"""Special motor class that provides flyer interface and progress bar."""
SUB_PROGRESS = "progress"
def __init__(
self,
prefix="",
*,
name,
kind=None,
read_attrs=None,
configuration_attrs=None,
parent=None,
**kwargs,
):
super().__init__(
prefix=prefix,
name=name,
kind=kind,
read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
parent=parent,
**kwargs,
)
self._startPosition = None
self._targetPosition = None
self.subscribe(self._progress_update, run=False)
def configure(self, d: dict):
if "target" in d:
self._targetPosition = d["target"]
del d["target"]
if "position" in d:
self._targetPosition = d["position"]
del d["position"]
return super().configure(d)
def kickoff(self):
self._startPosition = float(self.position)
return self.move(self._targetPosition, wait=False)
def move(self, position, wait=True, **kwargs):
self._startPosition = float(self.position)
return super().move(position, wait, **kwargs)
def _progress_update(self, value, **kwargs) -> None:
"""Progress update on the scan"""
if (self._startPosition is None) or (self._targetPosition is None) or (not self.moving):
self._run_subs(sub_type=self.SUB_PROGRESS, value=1, max_value=1, done=1)
return
progress = np.abs(
(value - self._startPosition) / (self._targetPosition - self._startPosition)
)
max_value = 100
self._run_subs(
sub_type=self.SUB_PROGRESS,
value=int(100 * progress),
max_value=max_value,
done=int(np.isclose(max_value, progress, 1e-3)),
)
try:
from bec_lib import bec_logger
logger = bec_logger.logger
except ModuleNotFoundError:
import logging
logger = logging.getLogger("GfCam")
class EpicsPassiveRO(EpicsSignalRO):
@@ -93,42 +36,24 @@ class EpicsPassiveRO(EpicsSignalRO):
self._proc.set(1).wait()
return super().get(*args, **kwargs)
@property
def value(self):
return super().value
# @property
# def value(self):
# return super().value
class aa1Controller(Device):
"""Ophyd proxy class for the Aerotech Automation 1's core controller functionality"""
# ToDo: Add error subscription
controllername = Component(EpicsSignalRO, "NAME", kind=Kind.config)
serialnumber = Component(EpicsSignalRO, "SN", kind=Kind.config)
apiversion = Component(EpicsSignalRO, "API_VERSION", kind=Kind.config)
axiscount = Component(EpicsSignalRO, "AXISCOUNT", kind=Kind.config)
taskcount = Component(EpicsSignalRO, "TASKCOUNT", kind=Kind.config)
fastpoll = Component(EpicsSignalRO, "POLLTIME", auto_monitor=True, kind=Kind.hinted)
slowpoll = Component(EpicsSignalRO, "DRVPOLLTIME", auto_monitor=True, kind=Kind.hinted)
def __init__(
self,
prefix="",
*,
name,
kind=None,
read_attrs=None,
configuration_attrs=None,
parent=None,
**kwargs,
):
super().__init__(
prefix=prefix,
name=name,
kind=kind,
read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
parent=parent,
**kwargs,
)
fastpoll = Component(EpicsSignalRO, "POLLTIME", auto_monitor=True, kind=Kind.normal)
slowpoll = Component(EpicsSignalRO, "DRVPOLLTIME", auto_monitor=True, kind=Kind.normal)
errno = Component(EpicsSignalRO, "ERRNO", auto_monitor=True, kind=Kind.hinted)
errnmsg = Component(EpicsSignalRO, "ERRMSG", auto_monitor=True, kind=Kind.hinted)
class aa1Tasks(Device):
@@ -161,8 +86,11 @@ class aa1Tasks(Device):
'''
"""
_current_task = None
_text_to_execute = None
_is_configured = False
_is_stepconfig = False
SUB_PROGRESS = "progress"
_failure = Component(EpicsSignalRO, "FAILURE", auto_monitor=True, kind=Kind.hinted)
errStatus = Component(EpicsSignalRO, "ERRW", auto_monitor=True, kind=Kind.hinted)
warnStatus = Component(EpicsSignalRO, "WARNW", auto_monitor=True, kind=Kind.hinted)
@@ -200,42 +128,6 @@ class aa1Tasks(Device):
parent=parent,
**kwargs,
)
self._currentTask = None
self._textToExecute = None
self._isConfigured = False
self._isStepConfig = False
self.subscribe(self._progress_update, "progress", run=False)
def _progress_update(self, value, **kwargs) -> None:
"""Progress update on the scan"""
value = self.progress()
self._run_subs(sub_type=self.SUB_PROGRESS, value=value, max_value=1, done=1)
def _progress(self) -> None:
"""Progress update on the scan"""
if self._currentTaskMonitor is None:
return 1
else:
if self._currentTaskMonitor.status.value in ["Running", 4]:
return 0
else:
return 1
def readFile(self, filename: str) -> str:
"""Read a file from the controller"""
# Have to use CHAR array due to EPICS LSI bug...
self.fileName.set(filename).wait()
filebytes = self._fileRead.get()
# C-strings terminate with trailing zero
if filebytes[-1] == 0:
filebytes = filebytes[:-1]
filetext = filebytes
return filetext
def writeFile(self, filename: str, filetext: str) -> None:
"""Write a file to the controller"""
self.fileName.set(filename).wait()
self._fileWrite.set(filetext).wait()
def runScript(self, filename: str, taskIndex: int == 2, filetext=None, settle_time=0.5) -> None:
"""Run a script file that either exists, or is newly created and compiled"""
@@ -245,103 +137,83 @@ class aa1Tasks(Device):
self.trigger().wait()
print("Runscript waited")
def execute(self, text: str, taskIndex: int = 3, mode: str = 0, settle_time=0.5):
def execute(self, text: str, taskIndex: int = 4, mode: str = 0):
"""Run a short text command on the Automation1 controller"""
print(f"Executing program on task: {taskIndex}")
logger.info(f"[{self.name}] Launching program execution on task: {taskIndex}")
self.configure({"text": text, "taskIndex": taskIndex, "mode": mode})
self.kickoff().wait()
if mode in [0, "None", None]:
return None
else:
raw = self._executeReply.get()
return raw
raw = self._executeReply.get()
return raw
def configure(self, d: dict = {}) -> tuple:
def configure(self, d: dict) -> tuple:
"""Configuration interface for flying"""
# Unrolling the configuration dict
text = str(d["text"]) if "text" in d else None
filename = str(d["filename"]) if "filename" in d else None
taskIndex = int(d["taskIndex"]) if "taskIndex" in d else 4
settle_time = float(d["settle_time"]) if "settle_time" in d else None
mode = d["mode"] if "mode" in d else None
self._isStepConfig = d["stepper"] if "stepper" in d else False
text = d.get("text", None)
filename = d.get("filename", None)
task_index = d.get("taskIndex", 4)
mode = d.get("mode", None)
self._is_stepconfig = d.get("stepper", False)
# Validation
if taskIndex < 1 or taskIndex > 31:
raise RuntimeError(f"Invalid task index: {taskIndex}")
if task_index < 1 or task_index > 31:
raise RuntimeError(f"Invalid task index: {task_index}")
if (text is None) and (filename is None):
raise RuntimeError("Task execution requires either AeroScript text or filename")
# Common operations
old = self.read_configuration()
self.taskIndex.set(taskIndex).wait()
self._textToExecute = None
self._currentTask = taskIndex
self.taskIndex.set(task_index).wait()
self._text_to_execute = None
self._current_task = task_index
# Choose the right execution mode
if (filename is None) and (text not in [None, ""]):
# Direct command execution
# Direct command execution from string
print("Preparing for direct command execution")
logger.info(f"[{self.name}] Preparing for direct text command execution")
if mode is not None:
self._executeMode.set(mode).wait()
self._textToExecute = text
self._text_to_execute = text
elif (filename is not None) and (text in [None, ""]):
# Execute existing file
# Execute an existing file
logger.info(f"[{self.name}] Preparing to execute existing file '{filename}'")
self.fileName.set(filename).wait()
self.switch.set("Load").wait()
self._text_to_execute = None
elif (filename is not None) and (text not in [None, ""]):
print("Preparing to execute via intermediary file")
logger.info(f"[{self.name}] Preparing to execute text via intermediary file '{filename}'")
# Execute text via intermediate file
self.taskIndex.set(taskIndex).wait()
self.taskIndex.set(task_index).wait()
self.fileName.set(filename).wait()
self._fileWrite.set(text).wait()
self.switch.set("Load").wait()
self._textToExecute = None
self._text_to_execute = None
else:
raise RuntimeError("Unsupported filename-text combo")
if self._failure.value:
raise RuntimeError("Failed to launch task, please check the Aerotech IOC")
self._isConfigured = True
self._is_configured = True
new = self.read_configuration()
return (old, new)
##########################################################################
# Bluesky stepper interface
def stage(self) -> None:
"""Default staging"""
super().stage()
def unstage(self) -> None:
"""Default unstaging"""
super().unstage()
def trigger(self, settle_time=0.2) -> Status:
"""Execute the script on the configured task"""
if self._isStepConfig:
return self.kickoff(settle_time)
else:
status = DeviceStatus(self, settle_time=settle_time)
status.set_finished()
if settle_time is not None:
sleep(settle_time)
return status
# Bluesky flyer interface
def stop(self):
"""Stop the currently selected task"""
self.switch.set("Stop").wait()
##########################################################################
# Flyer interface
def kickoff(self, settle_time=0.2) -> DeviceStatus:
"""Execute the script on the configured task"""
if self._isConfigured:
if self._textToExecute is not None:
print(f"Kickoff directly executing string: {self._textToExecute}")
status = self._execute.set(self._textToExecute, settle_time=0.5)
print(f"Kickoff directly executing string: {self._text_to_execute}")
status = self._execute.set(self._text_to_execute, settle_time=0.5)
else:
status = self.switch.set("Run", settle_time=0.1)
else:
@@ -352,39 +224,21 @@ class aa1Tasks(Device):
return status
def complete(self) -> DeviceStatus:
"""Execute the script on the configured task"""
print("Called aa1Task.complete()")
""" Wait for a RUNNING task"""
timestamp_ = 0
taskIdx = int(self.taskIndex.get())
task_idx = int(self.taskIndex.get())
def notRunning2(*args, old_value, value, timestamp, **kwargs):
def not_running(*args, old_value, value, timestamp, **kwargs):
nonlocal timestamp_
result = False if value[taskIdx] in ["Running", 4] else True
result = False if value[task_idx] in ["Running", 4] else True
timestamp_ = timestamp
print(result)
return result
# Subscribe and wait for update
status = SubscriptionStatus(self.taskStates, notRunning2, settle_time=0.5)
status = SubscriptionStatus(self.taskStates, not_running, settle_time=0.5)
return status
def describe_collect(self) -> OrderedDict:
dd = OrderedDict()
dd["success"] = {
"source": "internal",
"dtype": "integer",
"shape": [],
"units": "",
"lower_ctrl_limit": 0,
"upper_ctrl_limit": 0,
}
return {self.name: dd}
def collect(self) -> OrderedDict:
ret = OrderedDict()
ret["timestamps"] = {"success": time.time()}
ret["data"] = {"success": 1}
yield ret
class aa1TaskState(Device):
@@ -400,12 +254,10 @@ class aa1TaskState(Device):
warnCode = Component(EpicsSignalRO, "WARNING", auto_monitor=True, kind=Kind.hinted)
def complete(self) -> StatusBase:
"""Bluesky flyer interface"""
print("Called aa1TaskState.complete()")
""" Wait for the task while RUNNING"""
# Define wait until the busy flag goes down (excluding initial update)
timestamp_ = 0
def notRunning(*args, old_value, value, timestamp, **kwargs):
def not_running(*args, old_value, value, timestamp, **kwargs):
nonlocal timestamp_
result = False if (timestamp_ == 0) else (value not in ["Running", 4])
timestamp_ = timestamp
@@ -413,199 +265,77 @@ class aa1TaskState(Device):
return result
# Subscribe and wait for update
status = SubscriptionStatus(self.status, notRunning, settle_time=0.5)
status = SubscriptionStatus(self.status, not_running, settle_time=0.5)
return status
def kickoff(self) -> DeviceStatus:
""" Standard Bluesky kickoff method"""
status = DeviceStatus(self)
status.set_finished()
return status
def describe_collect(self) -> OrderedDict:
dd = OrderedDict()
dd["success"] = {
"source": "internal",
"dtype": "integer",
"shape": [],
"units": "",
"lower_ctrl_limit": 0,
"upper_ctrl_limit": 0,
}
return dd
def collect(self) -> OrderedDict:
ret = OrderedDict()
ret["timestamps"] = {"success": time.time()}
ret["data"] = {"success": 1}
yield ret
class aa1DataAcquisition(Device):
"""Controller Data Acquisition - DONT USE at Tomcat
This class implements the controller data collection feature of the
Automation1 controller. This feature logs various inputs at a
**fixed frequency** from 1 kHz up to 200 kHz.
Usage:
1. Start a new configuration with "startConfig"
2. Add your signals with "addXxxSignal"
3. Start your data collection
4. Read back the recorded data with "readback"
"""
# Status monitoring
status = Component(EpicsSignalRO, "RUNNING", auto_monitor=True, kind=Kind.hinted)
points_max = Component(EpicsSignal, "MAXPOINTS", kind=Kind.config, put_complete=True)
signal_num = Component(EpicsSignalRO, "NITEMS", kind=Kind.config)
points_total = Component(EpicsSignalRO, "NTOTAL", auto_monitor=True, kind=Kind.hinted)
points_collected = Component(EpicsSignalRO, "NCOLLECTED", auto_monitor=True, kind=Kind.hinted)
points_retrieved = Component(EpicsSignalRO, "NRETRIEVED", auto_monitor=True, kind=Kind.hinted)
overflow = Component(EpicsSignalRO, "OVERFLOW", auto_monitor=True, kind=Kind.hinted)
runmode = Component(EpicsSignalRO, "MODE_RBV", auto_monitor=True, kind=Kind.hinted)
# DAQ setup
numpoints = Component(EpicsSignal, "NPOINTS", kind=Kind.config, put_complete=True)
frequency = Component(EpicsSignal, "FREQUENCY", kind=Kind.config, put_complete=True)
_configure = Component(EpicsSignal, "CONFIGURE", kind=Kind.omitted, put_complete=True)
def startConfig(self, npoints: int, frequency: DataCollectionFrequency):
self.numpoints.set(npoints).wait()
self.frequency.set(frequency).wait()
self._configure.set("START").wait()
def clearConfig(self):
self._configure.set("CLEAR").wait()
srcTask = Component(EpicsSignal, "SRC_TASK", kind=Kind.config, put_complete=True)
srcAxis = Component(EpicsSignal, "SRC_AXIS", kind=Kind.config, put_complete=True)
srcCode = Component(EpicsSignal, "SRC_CODE", kind=Kind.config, put_complete=True)
_srcAdd = Component(EpicsSignal, "SRC_ADD", kind=Kind.omitted, put_complete=True)
def addAxisSignal(self, axis: int, code: int) -> None:
"""Add a new axis-specific data signal to the DAQ configuration. The
most common signals are PositionFeedback and PositionError.
"""
self.srcAxis.set(axis).wait()
self.srcCode.set(code).wait()
self._srcAdd.set("AXIS").wait()
def addTaskSignal(self, task: int, code: int) -> None:
"""Add a new task-specific data signal to the DAQ configuration"""
self.srcTask.set(task).wait()
self.srcCode.set(code).wait()
self._srcAdd.set("TASK").wait()
def addSystemSignal(self, code: int) -> None:
"""Add a new system data signal to the DAQ configuration. The most
common signal is SampleCollectionTime."""
self.srcCode.set(code).wait()
self._srcAdd.set("SYSTEM").wait()
# Starting / stopping the DAQ
_mode = Component(EpicsSignal, "MODE", kind=Kind.config, put_complete=True)
_switch = Component(EpicsSignal, "SET", kind=Kind.omitted, put_complete=True)
def start(self, mode=DataCollectionMode.Snapshot) -> None:
"""Start a new data collection"""
self._mode.set(mode).wait()
self._switch.set("START").wait()
def stop(self) -> None:
"""Stop a running data collection"""
self._switch.set("STOP").wait()
def run(self, mode=DataCollectionMode.Snapshot) -> None:
"""Start a new data collection"""
self._mode.set(mode).wait()
self._switch.set("START").wait()
# Wait for finishing acquisition
# Note: this is very bad blocking sleep
while self.status.value != 0:
sleep(0.1)
sleep(0.1)
# Data readback
data = self.data_rb.get()
rows = self.data_rows.get()
cols = self.data_cols.get()
if len(data) == 0 or rows == 0 or cols == 0:
sleep(0.5)
data = self.data_rb.get()
rows = self.data_rows.get()
cols = self.data_cols.get()
print(f"Data shape: {rows} x {cols}")
data = data.reshape([int(rows), -1])
return data
# DAQ data readback
data_rb = Component(EpicsPassiveRO, "DATA", kind=Kind.hinted)
data_rows = Component(EpicsSignalRO, "DATA_ROWS", auto_monitor=True, kind=Kind.hinted)
data_cols = Component(EpicsSignalRO, "DATA_COLS", auto_monitor=True, kind=Kind.hinted)
data_stat = Component(EpicsSignalRO, "DATA_AVG", auto_monitor=True, kind=Kind.hinted)
def dataReadBack(self) -> np.ndarray:
"""Retrieves collected data from the controller"""
data = self.data_rb.get()
rows = self.data_rows.get()
cols = self.data_cols.get()
if len(data) == 0 or rows == 0 or cols == 0:
sleep(0.2)
data = self.data_rb.get()
rows = self.data_rows.get()
cols = self.data_cols.get()
print(f"Data shape: {rows} x {cols}")
data = data.reshape([int(rows), -1])
return data
class aa1GlobalVariables(Device):
"""Global variables
This class provides an interface to directly read/write global variables
on the Automation1 controller. These variables are accesible from script
files and are thus a convenient way to interface with the outside word.
This class provides a low-level interface to directly read/write global
variables on the Automation1 controller. These variables are accesible
from script files and are thus a convenient way to interface with the
outside word.
Read operations take as input the memory address and the size
Write operations work with the memory address and value
Usage:
var = aa1Tasks(AA1_IOC_NAME+":VAR:", name="var")
var.wait_for_connection()
ret = var.readInt(42)
var.writeFloat(1000, np.random.random(1024))
ret_arr = var.readFloat(1000, 1024)
Examples:
----------
'''
var = aa1GlobalVariables(AA1_IOC_NAME+":VAR:", name="var")
var.wait_for_connection()
ret = var.readInt(42)
var.writeFloat(1000, np.random.random(1024))
ret_arr = var.readFloat(1000, 1024)
'''
"""
# Status monitoring
USER_ACCESS = ['read_int', 'write_int', 'read_float', 'write_float', 'read_string', 'write_string']
# Available capacity
num_real = Component(EpicsSignalRO, "NUM-REAL_RBV", kind=Kind.config)
num_int = Component(EpicsSignalRO, "NUM-INT_RBV", kind=Kind.config)
num_string = Component(EpicsSignalRO, "NUM-STRING_RBV", kind=Kind.config)
# Read-write interface
integer_addr = Component(EpicsSignal, "INT-ADDR", kind=Kind.omitted, put_complete=True)
integer_size = Component(EpicsSignal, "INT-SIZE", kind=Kind.omitted, put_complete=True)
integer = Component(EpicsSignal, "INT", kind=Kind.omitted, put_complete=True)
integer_rb = Component(EpicsPassiveRO, "INT-RBV", kind=Kind.omitted)
integer_rb = Component(EpicsPassiveRO, "INT-RBV", kind=Kind.normal)
integerarr = Component(EpicsSignal, "INTARR", kind=Kind.omitted, put_complete=True)
integerarr_rb = Component(EpicsPassiveRO, "INTARR-RBV", kind=Kind.omitted)
integerarr_rb = Component(EpicsPassiveRO, "INTARR-RBV", kind=Kind.normal)
real_addr = Component(EpicsSignal, "REAL-ADDR", kind=Kind.omitted, put_complete=True)
real_size = Component(EpicsSignal, "REAL-SIZE", kind=Kind.omitted, put_complete=True)
real = Component(EpicsSignal, "REAL", kind=Kind.omitted, put_complete=True)
real_rb = Component(EpicsPassiveRO, "REAL-RBV", kind=Kind.omitted)
real_rb = Component(EpicsPassiveRO, "REAL-RBV", kind=Kind.normal)
realarr = Component(EpicsSignal, "REALARR", kind=Kind.omitted, put_complete=True)
realarr_rb = Component(EpicsPassiveRO, "REALARR-RBV", kind=Kind.omitted)
realarr_rb = Component(EpicsPassiveRO, "REALARR-RBV", kind=Kind.normal)
string_addr = Component(EpicsSignal, "STRING-ADDR", kind=Kind.omitted, put_complete=True)
string = Component(EpicsSignal, "STRING", string=True, kind=Kind.omitted, put_complete=True)
string_rb = Component(EpicsPassiveRO, "STRING-RBV", string=True, kind=Kind.omitted)
string_rb = Component(EpicsPassiveRO, "STRING-RBV", string=True, kind=Kind.normal)
def readInt(self, address: int, size: int = None) -> int:
"""Read a 64-bit integer global variable"""
def read_int(self, address: int, size: int = None) -> int:
"""Read a 64-bit integer global variable
Method to reads scalar and array global integer variables.
Parameters:
------------
address : Memory (start) address of the global integer.
size : Array size, set to 0 or None for scalar [default=None]
"""
if address > self.num_int.get():
raise RuntimeError("Integer address {address} is out of range")
if size is None:
if size is None or size==0:
self.integer_addr.set(address).wait()
return self.integer_rb.get()
else:
@@ -613,8 +343,16 @@ class aa1GlobalVariables(Device):
self.integer_size.set(size).wait()
return self.integerarr_rb.get()
def writeInt(self, address: int, value) -> None:
"""Write a 64-bit integer global variable"""
def write_int(self, address: int, value) -> None:
"""Write a 64-bit integer global variable
Method to write scalar or array global integer variables.
Parameters:
------------
address : Memory (start) address of the global integer.
value : Scalar, list, tuple or ndarray of numbers.
"""
if address > self.num_int.get():
raise RuntimeError("Integer address {address} is out of range")
@@ -631,7 +369,7 @@ class aa1GlobalVariables(Device):
else:
raise RuntimeError("Unsupported integer value type: {type(value)}")
def readFloat(self, address: int, size: int = None) -> float:
def read_float(self, address: int, size: int = None) -> float:
"""Read a 64-bit double global variable"""
if address > self.num_real.get():
raise RuntimeError("Floating point address {address} is out of range")
@@ -644,7 +382,7 @@ class aa1GlobalVariables(Device):
self.real_size.set(size).wait()
return self.realarr_rb.get()
def writeFloat(self, address: int, value) -> None:
def write_float(self, address: int, value) -> None:
"""Write a 64-bit float global variable"""
if address > self.num_real.get():
raise RuntimeError("Float address {address} is out of range")
@@ -662,9 +400,15 @@ class aa1GlobalVariables(Device):
else:
raise RuntimeError("Unsupported float value type: {type(value)}")
def readString(self, address: int) -> str:
"""Read a 40 letter string global variable
ToDo: Automation 1 strings are 256 bytes
def read_string(self, address: int) -> str:
"""Read a string global variable
Method to read a sting global variable. Standard Automation1 strings
are 256 bytes long (255 sharacter).
Parameters:
------------
address : Memory address of the global string.
"""
if address > self.num_string.get():
raise RuntimeError("String address {address} is out of range")
@@ -672,10 +416,20 @@ class aa1GlobalVariables(Device):
self.string_addr.set(address).wait()
return self.string_rb.get()
def writeString(self, address: int, value) -> None:
"""Write a 40 bytes string global variable"""
def write_string(self, address: int, value) -> None:
"""Write a string global variable
Method to write a maximum 255 character string global integer variable.
Parameters:
------------
address : Memory address of the global string.
value : The string to write.
"""
if address > self.num_string.get():
raise RuntimeError("Integer address {address} is out of range")
if len(value) > 255:
raise RuntimeError(f"Global strings must be shorter than 255 characters, tried {len(value)}")
if isinstance(value, str):
self.string_addr.set(address).wait()
@@ -688,127 +442,34 @@ class aa1GlobalVariableBindings(Device):
"""Polled global variables
This class provides an interface to read/write the first few global variables
on the Automation1 controller. These variables are continuously polled
and are thus a convenient way to interface scripts with the outside word.
on the Automation1 controller. These variables can be directly set and are
continuously polled and are thus a convenient way to interface scripts with
the outside word.
"""
int0 = Component(EpicsSignalRO, "INT0_RBV", auto_monitor=True, name="int0", kind=Kind.hinted)
int1 = Component(EpicsSignalRO, "INT1_RBV", auto_monitor=True, name="int1", kind=Kind.hinted)
int2 = Component(EpicsSignalRO, "INT2_RBV", auto_monitor=True, name="int2", kind=Kind.hinted)
int3 = Component(EpicsSignalRO, "INT3_RBV", auto_monitor=True, name="int3", kind=Kind.hinted)
int8 = Component(
EpicsSignal,
"INT8_RBV",
put_complete=True,
write_pv="INT8",
auto_monitor=True,
name="int8",
kind=Kind.hinted,
)
int9 = Component(
EpicsSignal,
"INT9_RBV",
put_complete=True,
write_pv="INT9",
auto_monitor=True,
name="int9",
kind=Kind.hinted,
)
int10 = Component(
EpicsSignal,
"INT10_RBV",
put_complete=True,
write_pv="INT10",
auto_monitor=True,
name="int10",
kind=Kind.hinted,
)
int11 = Component(
EpicsSignal,
"INT11_RBV",
put_complete=True,
write_pv="INT11",
auto_monitor=True,
name="int11",
kind=Kind.hinted,
)
int8 = Component(EpicsSignal, "INT8_RBV", put_complete=True, write_pv="INT8", auto_monitor=True, name="int8")
int9 = Component(EpicsSignal, "INT9_RBV", put_complete=True, write_pv="INT9", auto_monitor=True, name="int9")
int10 = Component(EpicsSignal, "INT10_RBV", put_complete=True, write_pv="INT10", auto_monitor=True, name="int10")
int11 = Component(EpicsSignal, "INT11_RBV", put_complete=True, write_pv="INT11", auto_monitor=True, name="int11")
float0 = Component(
EpicsSignalRO, "REAL0_RBV", auto_monitor=True, name="float0", kind=Kind.hinted
)
float1 = Component(
EpicsSignalRO, "REAL1_RBV", auto_monitor=True, name="float1", kind=Kind.hinted
)
float2 = Component(
EpicsSignalRO, "REAL2_RBV", auto_monitor=True, name="float2", kind=Kind.hinted
)
float3 = Component(
EpicsSignalRO, "REAL3_RBV", auto_monitor=True, name="float3", kind=Kind.hinted
)
float16 = Component(
EpicsSignal,
"REAL16_RBV",
write_pv="REAL16",
put_complete=True,
auto_monitor=True,
name="float16",
kind=Kind.hinted,
)
float17 = Component(
EpicsSignal,
"REAL17_RBV",
write_pv="REAL17",
put_complete=True,
auto_monitor=True,
name="float17",
kind=Kind.hinted,
)
float18 = Component(
EpicsSignal,
"REAL18_RBV",
write_pv="REAL18",
put_complete=True,
auto_monitor=True,
name="float18",
kind=Kind.hinted,
)
float19 = Component(
EpicsSignal,
"REAL19_RBV",
write_pv="REAL19",
put_complete=True,
auto_monitor=True,
name="float19",
kind=Kind.hinted,
)
float0 = Component(EpicsSignalRO, "REAL0_RBV", auto_monitor=True, name="float0")
float1 = Component(EpicsSignalRO, "REAL1_RBV", auto_monitor=True, name="float1")
float2 = Component(EpicsSignalRO, "REAL2_RBV", auto_monitor=True, name="float2")
float3 = Component(EpicsSignalRO, "REAL3_RBV", auto_monitor=True, name="float3")
float16 = Component(EpicsSignal, "REAL16_RBV", write_pv="REAL16", put_complete=True, auto_monitor=True, name="float16")
float17 = Component(EpicsSignal, "REAL17_RBV", write_pv="REAL17", put_complete=True, auto_monitor=True, name="float17")
float18 = Component(EpicsSignal, "REAL18_RBV", write_pv="REAL18", put_complete=True, auto_monitor=True, name="float18")
float19 = Component(EpicsSignal, "REAL19_RBV", write_pv="REAL19", put_complete=True, auto_monitor=True, name="float19")
# BEC LiveTable crashes on non-numeric values
str0 = Component(
EpicsSignalRO, "STR0_RBV", auto_monitor=True, string=True, name="str0", kind=Kind.config
)
str1 = Component(
EpicsSignalRO, "STR1_RBV", auto_monitor=True, string=True, name="str1", kind=Kind.config
)
str4 = Component(
EpicsSignal,
"STR4_RBV",
put_complete=True,
string=True,
auto_monitor=True,
write_pv="STR4",
name="str4",
kind=Kind.config,
)
str5 = Component(
EpicsSignal,
"STR5_RBV",
put_complete=True,
string=True,
auto_monitor=True,
write_pv="STR5",
name="str5",
kind=Kind.config,
)
str0 = Component(EpicsSignalRO, "STR0_RBV", auto_monitor=True, string=True, name="str0")
str1 = Component(EpicsSignalRO, "STR1_RBV", auto_monitor=True, string=True, name="str1")
str4 = Component(EpicsSignal, "STR4_RBV", put_complete=True, string=True, auto_monitor=True, write_pv="STR4", name="str4")
str5 = Component(EpicsSignal, "STR5_RBV", put_complete=True, string=True, auto_monitor=True, write_pv="STR5", name="str5")
class aa1AxisIo(Device):
@@ -819,8 +480,6 @@ class aa1AxisIo(Device):
should be done in AeroScript. Only one pin can be writen directly but
several can be polled!
"""
polllvl = Component(EpicsSignal, "POLLLVL", put_complete=True, kind=Kind.config)
ai0 = Component(EpicsSignalRO, "AI0-RBV", auto_monitor=True, kind=Kind.hinted)
ai1 = Component(EpicsSignalRO, "AI1-RBV", auto_monitor=True, kind=Kind.hinted)
ai2 = Component(EpicsSignalRO, "AI2-RBV", auto_monitor=True, kind=Kind.hinted)
@@ -832,25 +491,21 @@ class aa1AxisIo(Device):
di0 = Component(EpicsSignalRO, "DI0-RBV", auto_monitor=True, kind=Kind.hinted)
do0 = Component(EpicsSignalRO, "DO0-RBV", auto_monitor=True, kind=Kind.hinted)
ai_addr = Component(EpicsSignal, "AI-ADDR", put_complete=True, kind=Kind.config)
ai = Component(EpicsSignalRO, "AI-RBV", auto_monitor=True, kind=Kind.hinted)
ao_addr = Component(EpicsSignal, "AO-ADDR", put_complete=True, kind=Kind.config)
ao = Component(EpicsSignal, "AO-RBV", write_pv="AO", auto_monitor=True, kind=Kind.hinted)
di_addr = Component(EpicsSignal, "DI-ADDR", put_complete=True, kind=Kind.config)
di = Component(EpicsSignalRO, "DI-RBV", auto_monitor=True, kind=Kind.hinted)
do_addr = Component(EpicsSignal, "DO-ADDR", put_complete=True, kind=Kind.config)
do = Component(EpicsSignal, "DO-RBV", write_pv="DO", auto_monitor=True, kind=Kind.hinted)
def setAnalog(self, pin: int, value: float, settle_time=0.05):
def set_analog(self, pin: int, value: float, settle_time=0.05):
""" Set an analog output pin"""
# Set the address
self.ao_addr.set(pin).wait()
# Set the voltage
self.ao.set(value, settle_time=settle_time).wait()
def setDigital(self, pin: int, value: int, settle_time=0.05):
def set_digital(self, pin: int, value: int, settle_time=0.05):
""" Set a digital output pin"""
# Set the address
self.do_addr.set(pin).wait()
# Set the voltage
@@ -877,22 +532,22 @@ class aa1AxisPsoBase(Device):
# General module status
status = Component(EpicsSignalRO, "STATUS", auto_monitor=True, kind=Kind.hinted)
output = Component(EpicsSignalRO, "OUTPUT-RBV", auto_monitor=True, kind=Kind.hinted)
address = Component(EpicsSignalRO, "ARRAY-ADDR", kind=Kind.config)
_eventSingle = Component(EpicsSignal, "EVENT:SINGLE", put_complete=True, kind=Kind.omitted)
_reset = Component(EpicsSignal, "RESET", put_complete=True, kind=Kind.omitted)
posInput = Component(EpicsSignal, "DIST:INPUT", put_complete=True, kind=Kind.omitted)
# ########################################################################
# PSO Distance event module
dstEventsEna = Component(EpicsSignal, "DIST:EVENTS", put_complete=True, kind=Kind.omitted)
dstCounterEna = Component(EpicsSignal, "DIST:COUNTER", put_complete=True, kind=Kind.omitted)
dstCounterVal = Component(EpicsSignalRO, "DIST:CTR0_RBV", auto_monitor=True, kind=Kind.hinted)
dstArrayIdx = Component(EpicsSignalRO, "DIST:IDX_RBV", auto_monitor=True, kind=Kind.hinted)
dstCounterVal = Component(EpicsSignalRO, "DIST:CTR0_RBV", auto_monitor=True, kind=Kind.normal)
dstArrayIdx = Component(EpicsSignalRO, "DIST:IDX_RBV", auto_monitor=True, kind=Kind.normal)
dstArrayDepleted = Component(
EpicsSignalRO, "DIST:ARRAY-DEPLETED-RBV", auto_monitor=True, kind=Kind.hinted
EpicsSignalRO, "DIST:DEPLETED-RBV", auto_monitor=True, kind=Kind.normal
)
dstDirection = Component(EpicsSignal, "DIST:EVENTDIR", put_complete=True, kind=Kind.omitted)
dstDistance = Component(EpicsSignal, "DIST:DISTANCE", put_complete=True, kind=Kind.hinted)
dstDistance = Component(EpicsSignal, "DIST:DISTANCE", put_complete=True, kind=Kind.normal)
dstDistanceArr = Component(EpicsSignal, "DIST:DISTANCES", put_complete=True, kind=Kind.omitted)
dstArrayRearm = Component(EpicsSignal, "DIST:REARM-ARRAY", put_complete=True, kind=Kind.omitted)
@@ -904,6 +559,8 @@ class aa1AxisPsoBase(Device):
winCounter = Component(EpicsSignal, "WINDOW0:COUNTER", put_complete=True, kind=Kind.omitted)
_winLower = Component(EpicsSignal, "WINDOW0:LOWER", put_complete=True, kind=Kind.omitted)
_winUpper = Component(EpicsSignal, "WINDOW0:UPPER", put_complete=True, kind=Kind.omitted)
winArrayIdx = Component(EpicsSignalRO, "WINDOW0:IDX_NXT", auto_monitor=True, kind=Kind.normal)
winArrayDepleted = Component(EpicsSignalRO, "WINDOW0:DEPLETED-RBV", auto_monitor=True, kind=Kind.normal)
# ########################################################################
# PSO waveform module
@@ -928,10 +585,10 @@ class aa1AxisPsoBase(Device):
self._eventSingle.set(1, settle_time=settle_time).wait()
def toggle(self):
orig_waveMode = self.waveMode.get()
orig_wave_mode = self.waveMode.get()
self.waveMode.set("Toggle").wait()
self.fire(0.1)
self.waveMode.set(orig_waveMode).wait()
self.waveMode.set(orig_wave_mode).wait()
class aa1AxisPsoDistance(aa1AxisPsoBase):