refactor: fixed formatter for aerotech

This commit is contained in:
wakonig_k 2024-03-12 16:30:13 +01:00
parent 75360a9636
commit 573da8a20b
2 changed files with 161 additions and 145 deletions

View File

@ -40,5 +40,12 @@ from .aerotech.AerotechAutomation1 import (
)
from .SpmBase import SpmBase
from .aerotech.AerotechAutomation1 import aa1Controller, aa1Tasks, aa1GlobalVariables, aa1GlobalVariableBindings, aa1AxisPsoDistance, aa1AxisDriveDataCollection, EpicsMotorX
from .aerotech.AerotechAutomation1 import (
aa1Controller,
aa1Tasks,
aa1GlobalVariables,
aa1GlobalVariableBindings,
aa1AxisPsoDistance,
aa1AxisDriveDataCollection,
EpicsMotorX,
)

View File

@ -38,8 +38,7 @@ from collections import OrderedDict
class EpicsMotorX(EpicsMotor):
""" Special motor class that provides flyer interface and progress bar.
"""
"""Special motor class that provides flyer interface and progress bar."""
SUB_PROGRESS = "progress"
@ -88,7 +87,10 @@ class EpicsMotorX(EpicsMotor):
"""Progress update on the scan"""
if (self._startPosition is None) or (self._targetPosition is None) or (not self.moving):
self._run_subs(
sub_type=self.SUB_PROGRESS, value=1, max_value=1, done=1,
sub_type=self.SUB_PROGRESS,
value=1,
max_value=1,
done=1,
)
return
@ -105,8 +107,7 @@ class EpicsMotorX(EpicsMotor):
class EpicsPassiveRO(EpicsSignalRO):
""" Small helper class to read PVs that need to be processed first.
"""
"""Small helper class to read PVs that need to be processed first."""
def __init__(self, read_pv, *, string=False, name=None, **kwargs):
super().__init__(read_pv, string=string, name=name, **kwargs)
@ -159,33 +160,33 @@ class aa1Controller(Device):
class aa1Tasks(Device):
""" Task management API
The place to manage tasks and AeroScript user files on the controller.
You can read/write/compile/execute AeroScript files and also retrieve
saved data files from the controller. It will also work around an ophyd
bug that swallows failures.
Execution does not require to store the script in a file, it will compile
it and run it directly on a certain thread. But there's no way to
retrieve the source code.
Write a text into a file on the aerotech controller and execute it with kickoff.
'''
script="var $axis as axis = ROTY\\nMoveAbsolute($axis, 42, 90)"
tsk = aa1Tasks(AA1_IOC_NAME+":TASK:", name="tsk")
tsk.wait_for_connection()
tsk.configure({'text': script, 'filename': "foobar.ascript", 'taskIndex': 4})
tsk.kickoff().wait()
'''
"""Task management API
Just execute an ascript file already on the aerotech controller.
'''
tsk = aa1Tasks(AA1_IOC_NAME+":TASK:", name="tsk")
tsk.wait_for_connection()
tsk.configure({'filename': "foobar.ascript", 'taskIndex': 4})
tsk.kickoff().wait()
'''
The place to manage tasks and AeroScript user files on the controller.
You can read/write/compile/execute AeroScript files and also retrieve
saved data files from the controller. It will also work around an ophyd
bug that swallows failures.
Execution does not require to store the script in a file, it will compile
it and run it directly on a certain thread. But there's no way to
retrieve the source code.
Write a text into a file on the aerotech controller and execute it with kickoff.
'''
script="var $axis as axis = ROTY\\nMoveAbsolute($axis, 42, 90)"
tsk = aa1Tasks(AA1_IOC_NAME+":TASK:", name="tsk")
tsk.wait_for_connection()
tsk.configure({'text': script, 'filename': "foobar.ascript", 'taskIndex': 4})
tsk.kickoff().wait()
'''
Just execute an ascript file already on the aerotech controller.
'''
tsk = aa1Tasks(AA1_IOC_NAME+":TASK:", name="tsk")
tsk.wait_for_connection()
tsk.configure({'filename': "foobar.ascript", 'taskIndex': 4})
tsk.kickoff().wait()
'''
"""
@ -217,7 +218,7 @@ class aa1Tasks(Device):
parent=None,
**kwargs,
):
""" __init__ MUST have a full argument list"""
"""__init__ MUST have a full argument list"""
super().__init__(
prefix=prefix,
name=name,
@ -237,7 +238,10 @@ class aa1Tasks(Device):
"""Progress update on the scan"""
value = self.progress()
self._run_subs(
sub_type=self.SUB_PROGRESS, value=value, max_value=1, done=1,
sub_type=self.SUB_PROGRESS,
value=value,
max_value=1,
done=1,
)
def _progress(self) -> None:
@ -251,7 +255,7 @@ class aa1Tasks(Device):
return 1
def readFile(self, filename: str) -> str:
""" Read a file from the controller """
"""Read a file from the controller"""
# Have to use CHAR array due to EPICS LSI bug...
self.fileName.set(filename).wait()
filebytes = self._fileRead.get()
@ -262,12 +266,12 @@ class aa1Tasks(Device):
return filetext
def writeFile(self, filename: str, filetext: str) -> None:
""" Write a file to the controller """
"""Write a file to the controller"""
self.fileName.set(filename).wait()
self._fileWrite.set(filetext).wait()
def runScript(self, filename: str, taskIndex: int == 2, filetext=None, settle_time=0.5) -> None:
""" Run a script file that either exists, or is newly created and compiled"""
"""Run a script file that either exists, or is newly created and compiled"""
self.configure({"text": filetext, "filename": filename, "taskIndex": taskIndex})
print("Runscript configured")
@ -275,7 +279,7 @@ class aa1Tasks(Device):
print("Runscript waited")
def execute(self, text: str, taskIndex: int = 3, mode: str = 0, settle_time=0.5):
""" Run a short text command on the Automation1 controller"""
"""Run a short text command on the Automation1 controller"""
print(f"Executing program on task: {taskIndex}")
self.configure({"text": text, "taskIndex": taskIndex, "mode": mode})
@ -288,8 +292,7 @@ class aa1Tasks(Device):
return raw
def configure(self, d: dict = {}) -> tuple:
""" Configuration interface for flying
"""
"""Configuration interface for flying"""
# Unrolling the configuration dict
text = str(d["text"]) if "text" in d else None
filename = str(d["filename"]) if "filename" in d else None
@ -342,15 +345,15 @@ class aa1Tasks(Device):
##########################################################################
# Bluesky stepper interface
def stage(self) -> None:
""" Default staging """
"""Default staging"""
super().stage()
def unstage(self) -> None:
""" Default unstaging """
"""Default unstaging"""
super().unstage()
def trigger(self, settle_time=0.2) -> Status:
""" Execute the script on the configured task"""
"""Execute the script on the configured task"""
if self._isStepConfig:
return self.kickoff(settle_time)
else:
@ -361,13 +364,13 @@ class aa1Tasks(Device):
return status
def stop(self):
""" Stop the currently selected task """
"""Stop the currently selected task"""
self.switch.set("Stop").wait()
##########################################################################
# Flyer interface
def kickoff(self, settle_time=0.2) -> DeviceStatus:
""" Execute the script on the configured task"""
"""Execute the script on the configured task"""
if self._isConfigured:
if self._textToExecute is not None:
print(f"Kickoff directly executing string: {self._textToExecute}")
@ -382,7 +385,7 @@ class aa1Tasks(Device):
return status
def complete(self) -> DeviceStatus:
""" Execute the script on the configured task"""
"""Execute the script on the configured task"""
print("Called aa1Task.complete()")
timestamp_ = 0
taskIdx = int(self.taskIndex.get())
@ -418,10 +421,10 @@ class aa1Tasks(Device):
class aa1TaskState(Device):
""" Task state monitoring API
This is the task state monitoring interface for Automation1 tasks. It
does not launch execution, but can wait for the execution to complete.
"""Task state monitoring API
This is the task state monitoring interface for Automation1 tasks. It
does not launch execution, but can wait for the execution to complete.
"""
index = Component(EpicsSignalRO, "INDEX", kind=Kind.config)
@ -430,7 +433,7 @@ class aa1TaskState(Device):
warnCode = Component(EpicsSignalRO, "WARNING", auto_monitor=True, kind=Kind.hinted)
def complete(self) -> StatusBase:
""" Bluesky flyer interface"""
"""Bluesky flyer interface"""
print("Called aa1TaskState.complete()")
# Define wait until the busy flag goes down (excluding initial update)
timestamp_ = 0
@ -471,16 +474,16 @@ class aa1TaskState(Device):
class aa1DataAcquisition(Device):
""" Controller Data Acquisition - DONT USE at Tomcat
This class implements the controller data collection feature of the
Automation1 controller. This feature logs various inputs at a
**fixed frequency** from 1 kHz up to 200 kHz.
Usage:
1. Start a new configuration with "startConfig"
2. Add your signals with "addXxxSignal"
3. Start your data collection
4. Read back the recorded data with "readback"
"""Controller Data Acquisition - DONT USE at Tomcat
This class implements the controller data collection feature of the
Automation1 controller. This feature logs various inputs at a
**fixed frequency** from 1 kHz up to 200 kHz.
Usage:
1. Start a new configuration with "startConfig"
2. Add your signals with "addXxxSignal"
3. Start your data collection
4. Read back the recorded data with "readback"
"""
# Status monitoring
@ -512,22 +515,22 @@ class aa1DataAcquisition(Device):
_srcAdd = Component(EpicsSignal, "SRC_ADD", kind=Kind.omitted, put_complete=True)
def addAxisSignal(self, axis: int, code: int) -> None:
""" Add a new axis-specific data signal to the DAQ configuration. The
most common signals are PositionFeedback and PositionError.
"""Add a new axis-specific data signal to the DAQ configuration. The
most common signals are PositionFeedback and PositionError.
"""
self.srcAxis.set(axis).wait()
self.srcCode.set(code).wait()
self._srcAdd.set("AXIS").wait()
def addTaskSignal(self, task: int, code: int) -> None:
""" Add a new task-specific data signal to the DAQ configuration"""
"""Add a new task-specific data signal to the DAQ configuration"""
self.srcTask.set(task).wait()
self.srcCode.set(code).wait()
self._srcAdd.set("TASK").wait()
def addSystemSignal(self, code: int) -> None:
""" Add a new system data signal to the DAQ configuration. The most
common signal is SampleCollectionTime. """
"""Add a new system data signal to the DAQ configuration. The most
common signal is SampleCollectionTime."""
self.srcCode.set(code).wait()
self._srcAdd.set("SYSTEM").wait()
@ -536,16 +539,16 @@ class aa1DataAcquisition(Device):
_switch = Component(EpicsSignal, "SET", kind=Kind.omitted, put_complete=True)
def start(self, mode=DataCollectionMode.Snapshot) -> None:
""" Start a new data collection """
"""Start a new data collection"""
self._mode.set(mode).wait()
self._switch.set("START").wait()
def stop(self) -> None:
""" Stop a running data collection """
"""Stop a running data collection"""
self._switch.set("STOP").wait()
def run(self, mode=DataCollectionMode.Snapshot) -> None:
""" Start a new data collection """
"""Start a new data collection"""
self._mode.set(mode).wait()
self._switch.set("START").wait()
# Wait for finishing acquisition
@ -589,12 +592,12 @@ class aa1DataAcquisition(Device):
class aa1GlobalVariables(Device):
""" Global variables
This class provides an interface to directly read/write global variables
"""Global variables
This class provides an interface to directly read/write global variables
on the Automation1 controller. These variables are accesible from script
files and are thus a convenient way to interface with the outside word.
Read operations take as input the memory address and the size
Write operations work with the memory address and value
@ -631,7 +634,7 @@ class aa1GlobalVariables(Device):
string_rb = Component(EpicsPassiveRO, "STRING-RBV", string=True, kind=Kind.omitted)
def readInt(self, address: int, size: int = None) -> int:
""" Read a 64-bit integer global variable """
"""Read a 64-bit integer global variable"""
if address > self.num_int.get():
raise RuntimeError("Integer address {address} is out of range")
@ -644,7 +647,7 @@ class aa1GlobalVariables(Device):
return self.integerarr_rb.get()
def writeInt(self, address: int, value) -> None:
""" Write a 64-bit integer global variable """
"""Write a 64-bit integer global variable"""
if address > self.num_int.get():
raise RuntimeError("Integer address {address} is out of range")
@ -662,7 +665,7 @@ class aa1GlobalVariables(Device):
raise RuntimeError("Unsupported integer value type: {type(value)}")
def readFloat(self, address: int, size: int = None) -> float:
""" Read a 64-bit double global variable """
"""Read a 64-bit double global variable"""
if address > self.num_real.get():
raise RuntimeError("Floating point address {address} is out of range")
@ -675,7 +678,7 @@ class aa1GlobalVariables(Device):
return self.realarr_rb.get()
def writeFloat(self, address: int, value) -> None:
""" Write a 64-bit float global variable """
"""Write a 64-bit float global variable"""
if address > self.num_real.get():
raise RuntimeError("Float address {address} is out of range")
@ -693,7 +696,7 @@ class aa1GlobalVariables(Device):
raise RuntimeError("Unsupported float value type: {type(value)}")
def readString(self, address: int) -> str:
""" Read a 40 letter string global variable
"""Read a 40 letter string global variable
ToDo: Automation 1 strings are 256 bytes
"""
if address > self.num_string.get():
@ -703,7 +706,7 @@ class aa1GlobalVariables(Device):
return self.string_rb.get()
def writeString(self, address: int, value) -> None:
""" Write a 40 bytes string global variable """
"""Write a 40 bytes string global variable"""
if address > self.num_string.get():
raise RuntimeError("Integer address {address} is out of range")
@ -715,9 +718,9 @@ class aa1GlobalVariables(Device):
class aa1GlobalVariableBindings(Device):
""" Polled global variables
This class provides an interface to read/write the first few global variables
"""Polled global variables
This class provides an interface to read/write the first few global variables
on the Automation1 controller. These variables are continuously polled
and are thus a convenient way to interface scripts with the outside word.
"""
@ -842,11 +845,11 @@ class aa1GlobalVariableBindings(Device):
class aa1AxisIo(Device):
""" Analog / digital Input-Output
This class provides convenience wrappers around the Aerotech API's axis
"""Analog / digital Input-Output
This class provides convenience wrappers around the Aerotech API's axis
specific IO functionality. Note that this is a low-speed API, actual work
should be done in AeroScript. Only one pin can be writen directly but
should be done in AeroScript. Only one pin can be writen directly but
several can be polled!
"""
@ -888,18 +891,18 @@ class aa1AxisIo(Device):
class aa1AxisPsoBase(Device):
""" Position Sensitive Output - Base class
This class provides convenience wrappers around the Aerotech IOC's PSO
functionality. As a base class, it's just a collection of PVs without
significant logic (that should be implemented in the child classes).
It uses event-waveform concept to produce signals on the configured
"""Position Sensitive Output - Base class
This class provides convenience wrappers around the Aerotech IOC's PSO
functionality. As a base class, it's just a collection of PVs without
significant logic (that should be implemented in the child classes).
It uses event-waveform concept to produce signals on the configured
output pin: a specified position based event will trigger the generation
of a waveform on the oputput that can be either used as exposure enable,
as individual trigger or as a series of triggers per each event.
of a waveform on the oputput that can be either used as exposure enable,
as individual trigger or as a series of triggers per each event.
As a first approach, the module follows a simple pipeline structure:
Genrator --> Event --> Waveform --> Output
Specific operation modes should be implemented in child classes.
"""
@ -954,7 +957,7 @@ class aa1AxisPsoBase(Device):
outSource = Component(EpicsSignal, "SOURCE", put_complete=True, kind=Kind.omitted)
def fire(self, settle_time=None):
""" Fire a single PSO event (i.e. manual software trigger)"""
"""Fire a single PSO event (i.e. manual software trigger)"""
self._eventSingle.set(1, settle_time=settle_time).wait()
def toggle(self):
@ -965,22 +968,22 @@ class aa1AxisPsoBase(Device):
class aa1AxisPsoDistance(aa1AxisPsoBase):
""" Position Sensitive Output - Distance mode
This class provides convenience wrappers around the Aerotech API's PSO
functionality in distance mode. It uses event-waveform concept to produce
signals on the configured output pin: a specified position based event
"""Position Sensitive Output - Distance mode
This class provides convenience wrappers around the Aerotech API's PSO
functionality in distance mode. It uses event-waveform concept to produce
signals on the configured output pin: a specified position based event
will trigger the generation af a waveform on the oputput that can be either
used as exposure enable, as individual trigger or as a series of triggers
per each event.
used as exposure enable, as individual trigger or as a series of triggers
per each event.
As a first approach, the module follows a simple pipeline structure:
Genrator (distance) --> Event --> Waveform --> Output
The module provides configuration interface to common functionality, such
The module provides configuration interface to common functionality, such
as fixed distance or array based triggering and can serve as a base for
future advanced functionality. The relative distances ease the limitations
future advanced functionality. The relative distances ease the limitations
coming from 32 bit PSO positions.
For a more detailed description of additional signals and masking plase
For a more detailed description of additional signals and masking plase
refer to Automation1's online manual.
Usage:
@ -1010,7 +1013,7 @@ class aa1AxisPsoDistance(aa1AxisPsoBase):
parent=None,
**kwargs,
):
""" __init__ MUST have a full argument list"""
"""__init__ MUST have a full argument list"""
super().__init__(
prefix=prefix,
name=name,
@ -1028,7 +1031,10 @@ class aa1AxisPsoDistance(aa1AxisPsoBase):
if self.dstArrayDepleted.value:
print("PSO array depleted")
self._run_subs(
sub_type=self.SUB_PROGRESS, value=1, max_value=1, done=1,
sub_type=self.SUB_PROGRESS,
value=1,
max_value=1,
done=1,
)
return
@ -1045,11 +1051,11 @@ class aa1AxisPsoDistance(aa1AxisPsoBase):
# ########################################################################
# PSO high level interface
def configure(self, d: dict = {}) -> tuple:
""" Simplified configuration interface to access the most common
functionality for distance mode PSO.
:param distance: The trigger distance or the array of distances between subsequent points.
:param wmode: Waveform mode configuration, usually pulsed/toggled.
"""Simplified configuration interface to access the most common
functionality for distance mode PSO.
:param distance: The trigger distance or the array of distances between subsequent points.
:param wmode: Waveform mode configuration, usually pulsed/toggled.
"""
distance = d["distance"]
wmode = str(d["wmode"])
@ -1111,7 +1117,7 @@ class aa1AxisPsoDistance(aa1AxisPsoBase):
# ########################################################################
# Bluesky step scan interface
def complete(self, settle_time=0.1) -> DeviceStatus:
""" DDC just reads back whatever is available in the buffers"""
"""DDC just reads back whatever is available in the buffers"""
sleep(settle_time)
status = DeviceStatus(self)
status.set_finished()
@ -1143,7 +1149,7 @@ class aa1AxisPsoDistance(aa1AxisPsoBase):
return status
def complete(self) -> DeviceStatus:
""" Bluesky flyer interface"""
"""Bluesky flyer interface"""
# Array mode waits until the buffer is empty
if hasattr(self, "_distanceValue") and isinstance(
self._distanceValue, (np.ndarray, list, tuple)
@ -1189,22 +1195,22 @@ class aa1AxisPsoDistance(aa1AxisPsoBase):
class aa1AxisPsoWindow(aa1AxisPsoBase):
""" Position Sensitive Output - Window mode
This class provides convenience wrappers around the Aerotech API's PSO
functionality in window mode. It can either use the event-waveform concept
or provide a direct window output signal (in/out) to the output pin. The
latter is particularly well-suited for the generation of trigger enable
"""Position Sensitive Output - Window mode
This class provides convenience wrappers around the Aerotech API's PSO
functionality in window mode. It can either use the event-waveform concept
or provide a direct window output signal (in/out) to the output pin. The
latter is particularly well-suited for the generation of trigger enable
signals, while in event mode it allows the finetuning of trigger lenth.
As a first approach, the module follows a simple pipeline structure:
Genrator --> Event --> Waveform --> Output pin
Genrator --> Window output --> Output pin
The module provides configuration interface to common functionality, such
as repeated trigger enable signal or fixed area scaning. Unfortunately the
entered positions are absolute, meaning this mode has an inherent limitation
The module provides configuration interface to common functionality, such
as repeated trigger enable signal or fixed area scaning. Unfortunately the
entered positions are absolute, meaning this mode has an inherent limitation
with encoder counters being kept in 32 bit integers.
For a more detailed description of additional signals and masking plase
For a more detailed description of additional signals and masking plase
refer to Automation1's online manual.
"""
@ -1219,7 +1225,7 @@ class aa1AxisPsoWindow(aa1AxisPsoBase):
parent=None,
**kwargs,
):
""" __init__ MUST have a full argument list"""
"""__init__ MUST have a full argument list"""
super().__init__(
prefix=prefix,
name=name,
@ -1235,11 +1241,11 @@ class aa1AxisPsoWindow(aa1AxisPsoBase):
# ########################################################################
# PSO high level interface
def configure(self, d: dict = {}) -> tuple:
""" Simplified configuration interface to access the most common
functionality for distance mode PSO.
:param distance: The trigger distance or the array of distances between subsequent points.
:param wmode: Waveform mode configuration, usually output/pulsed/toggled.
"""Simplified configuration interface to access the most common
functionality for distance mode PSO.
:param distance: The trigger distance or the array of distances between subsequent points.
:param wmode: Waveform mode configuration, usually output/pulsed/toggled.
"""
bounds = d["distance"]
wmode = str(d["wmode"])
@ -1328,13 +1334,13 @@ class aa1AxisPsoWindow(aa1AxisPsoBase):
class aa1AxisDriveDataCollection(Device):
""" Axis data collection
This class provides convenience wrappers around the Aerotech API's axis
"""Axis data collection
This class provides convenience wrappers around the Aerotech API's axis
specific data collection functionality. This module allows to record
hardware synchronized signals with up to 200 kHz.
The default configuration is using a fixed memory mapping allowing up to
The default configuration is using a fixed memory mapping allowing up to
1 million recorded data points on an XC4e (this depends on controller).
Usage:
@ -1393,7 +1399,10 @@ class aa1AxisDriveDataCollection(Device):
"""Progress update on the scan"""
if self.state.value not in (2, "Acquiring"):
self._run_subs(
sub_type=self.SUB_PROGRESS, value=1, max_value=1, done=1,
sub_type=self.SUB_PROGRESS,
value=1,
max_value=1,
done=1,
)
return
@ -1441,18 +1450,18 @@ class aa1AxisDriveDataCollection(Device):
return status
def complete(self, settle_time=0.1) -> DeviceStatus:
""" DDC just reads back whatever is available in the buffers"""
"""DDC just reads back whatever is available in the buffers"""
sleep(settle_time)
status = DeviceStatus(self)
status.set_finished()
return status
def _collect(self, index=0):
""" Force a readback of the data buffer
Note that there's a weird behaviour in ophyd that it issues an
initial update event with the initial value but 0 timestamp. Theese
old_values are invalid and must be filtered out.
"""Force a readback of the data buffer
Note that there's a weird behaviour in ophyd that it issues an
initial update event with the initial value but 0 timestamp. Theese
old_values are invalid and must be filtered out.
"""
# Define wait until the busy flag goes down (excluding initial update)