Device reformatting to 9.55

This commit is contained in:
gac-x05la
2024-12-13 15:37:05 +01:00
parent 39403229f6
commit e22cdeba30
7 changed files with 120 additions and 107 deletions

View File

@@ -1,7 +1,11 @@
from time import sleep
# -*- coding: utf-8 -*-
"""
Ophyd device for the Aerotech Automation1 IOC's generic interfaces
@author: mohacsi_i
"""
import numpy as np
from ophyd import Component, Device, EpicsMotor, EpicsSignal, EpicsSignalRO, Kind
from ophyd.status import DeviceStatus, SubscriptionStatus
from bec_lib import bec_logger
logger = bec_logger.logger
@@ -38,8 +42,8 @@ class aa1Controller(Device):
taskcount = Component(EpicsSignalRO, "TASKCOUNT", kind=Kind.config)
fastpoll = Component(EpicsSignalRO, "POLLTIME", auto_monitor=True, kind=Kind.normal)
slowpoll = Component(EpicsSignalRO, "DRVPOLLTIME", auto_monitor=True, kind=Kind.normal)
errno = Component(EpicsSignalRO, "ERRNO", auto_monitor=True, kind=Kind.hinted)
errnmsg = Component(EpicsSignalRO, "ERRMSG", auto_monitor=True, kind=Kind.hinted)
errno = Component(EpicsSignalRO, "ERRNO", auto_monitor=True, kind=Kind.normal)
errnmsg = Component(EpicsSignalRO, "ERRMSG", auto_monitor=True, kind=Kind.normal)
_set_ismc = Component(EpicsSignal, "SET", put_complete=True, kind=Kind.omitted)
USER_ACCESS = ["reset"]
@@ -111,10 +115,10 @@ class aa1GlobalVariables(Device):
if size is None or size == 0:
self.integer_addr.set(address).wait()
return self.integer_rb.get()
else:
self.integer_addr.set(address).wait()
self.integer_size.set(size).wait()
return self.integerarr_rb.get()
self.integer_addr.set(address).wait()
self.integer_size.set(size).wait()
return self.integerarr_rb.get()
def write_int(self, address: int, value, settle_time=0.1) -> None:
"""Write a 64-bit integer global variable
@@ -150,10 +154,10 @@ class aa1GlobalVariables(Device):
if size is None:
self.real_addr.set(address).wait()
return self.real_rb.get()
else:
self.real_addr.set(address).wait()
self.real_size.set(size).wait()
return self.realarr_rb.get()
self.real_addr.set(address).wait()
self.real_size.set(size).wait()
return self.realarr_rb.get()
def write_float(self, address: int, value, settle_time=0.1) -> None:
"""Write a 64-bit float global variable"""

View File

@@ -1,5 +1,14 @@
# -*- coding: utf-8 -*-
"""
Enumerations for the Aerotech Automation1 controller as adopted from the Aerotech library.
@author: mohacsi_i
"""
from enum import Enum
# pylint: disable=missing-class-docstring
# pylint: disable=too-few-public-methods
class TomcatSequencerState:
IDLE = 0

View File

@@ -1,15 +1,16 @@
# -*- coding: utf-8 -*-
"""
Ophyd device for the Aerotech Automation1 IOC's axis-specific synchronized
drive data collection (DDC) interface.
@author: mohacsi_i
"""
import time
from collections import OrderedDict
from ophyd import Component, EpicsSignal, EpicsSignalRO, Kind
from ophyd.status import SubscriptionStatus
try:
from AerotechAutomation1Enums import DriveDataCaptureInput, DriveDataCaptureTrigger
except ModuleNotFoundError:
from tomcat_bec.devices.aerotech.AerotechAutomation1Enums import DriveDataCaptureInput
from tomcat_bec.devices.aerotech.AerotechAutomation1Enums import DriveDataCaptureTrigger
from ophyd_devices.interfaces.base_classes.psi_detector_base import PSIDetectorBase as PSIDeviceBase
from ophyd_devices.interfaces.base_classes.psi_detector_base import (
CustomDetectorMixin as CustomDeviceMixin,
@@ -20,7 +21,7 @@ logger = bec_logger.logger
class AerotechDriveDataCollectionMixin(CustomDeviceMixin):
"""Configuration and staging
"""Mixin class for self-configuration and staging
NOTE: scripted scans start drive data collection internally
"""
@@ -29,23 +30,29 @@ class AerotechDriveDataCollectionMixin(CustomDeviceMixin):
"""Configuration and staging"""
# Fish out configuration from scaninfo (does not need to be full configuration)
scanparam = self.parent.scaninfo.scan_msg.info
alias = self.parent.parent.name if self.parent.parent is not None else self.parent.name
logger.warning(f"[{alias}] Scan parameters:\n{scanparam}")
d = {}
if "kwargs" in scanparam:
scanargs = scanparam["kwargs"]
if "kwargs" in self.parent.scaninfo.scan_msg.info:
scanargs = self.parent.scaninfo.scan_msg.info["kwargs"]
# NOTE: Scans don't have to fully configure the device
if "ddc_trigger" in scanargs:
d["ddc_trigger"] = scanargs["ddc_trigger"]
if "steps" in scanargs and "exp_burst" in scanargs:
scan_steps = scanargs["steps"]
scan_burst = scanargs["exp_burst"]
d["num_points_total"] = (scan_steps+1) * scan_burst
elif "exp_burst" in scanargs:
d["num_points_total"] = scanargs["exp_burst"]
elif "steps" in scanargs:
d["num_points_total"] = scanargs["steps"]
if "ddc_num_points" in scanargs:
d["num_points_total"] = scanargs["ddc_num_points"]
else:
# Try to figure out number of points
num_points = 1
points_valid = False
if "steps" in scanargs and scanargs['steps'] is not None:
num_points *= scanargs["steps"]
points_valid = True
elif "exp_burst" in scanargs and scanargs['exp_burst'] is not None:
num_points *= scanargs["exp_burst"]
points_valid = True
elif "repeats" in scanargs and scanargs['repeats'] is not None:
num_points *= scanargs["repeats"]
points_valid = True
if points_valid:
d["num_points_total"] = num_points
# Perform bluesky-style configuration
if len(d) > 0:
@@ -107,7 +114,7 @@ class aa1AxisDriveDataCollection(PSIDeviceBase):
custom_prepare_cls = AerotechDriveDataCollectionMixin
USER_ACCESS = ["configure", "reset"]
def configure(self, d: dict = {}) -> tuple:
def configure(self, d: dict = None) -> tuple:
"""Configure data capture
Configures the hardware synchronized drive data capture (DDC) on an
@@ -117,14 +124,15 @@ class aa1AxisDriveDataCollection(PSIDeviceBase):
"""
old = self.read_configuration()
if "num_points_total" in d:
self.npoints.set(d["num_points_total"]).wait()
if "ddc_trigger" in d:
self._trigger.set(d['ddc_trigger']).wait()
if "ddc_source0" in d:
self._input0.set(d['ddc_source0']).wait()
if "ddc_source1" in d:
self._input1.set(d['ddc_source1']).wait()
if d is not None:
if "num_points_total" in d:
self.npoints.set(d["num_points_total"]).wait()
if "ddc_trigger" in d:
self._trigger.set(d['ddc_trigger']).wait()
if "ddc_source0" in d:
self._input0.set(d['ddc_source0']).wait()
if "ddc_source1" in d:
self._input1.set(d['ddc_source1']).wait()
# Reset incremental readback
self._switch.set("ResetRB", settle_time=0.1).wait()
@@ -150,17 +158,17 @@ class aa1AxisDriveDataCollection(PSIDeviceBase):
# Define wait until the busy flag goes down (excluding initial update)
timestamp_ = 0
def negEdge(*args, old_value, value, timestamp, **kwargs):
def neg_edge(*args, old_value, value, timestamp, **kwargs):
nonlocal timestamp_
result = False if (timestamp_ == 0) else (old_value == 1 and value == 0)
timestamp_ = timestamp
return result
if index == 0:
status = SubscriptionStatus(self._readstatus0, negEdge, settle_time=0.5)
status = SubscriptionStatus(self._readstatus0, neg_edge, settle_time=0.5)
self._readback0.set(1).wait()
elif index == 1:
status = SubscriptionStatus(self._readstatus1, negEdge, settle_time=0.5)
status = SubscriptionStatus(self._readstatus1, neg_edge, settle_time=0.5)
self._readback1.set(1).wait()
# Start asynchronous readback
@@ -168,6 +176,8 @@ class aa1AxisDriveDataCollection(PSIDeviceBase):
return status
def describe_collect(self) -> OrderedDict:
"""Describes collected array format according to JSONschema
"""
ret = OrderedDict()
ret["buffer0"] = {
"source": "internal",

View File

@@ -1,3 +1,10 @@
# -*- coding: utf-8 -*-
"""
Ophyd device for the Aerotech Automation1 IOC's axis-specific position
synchronized output (PSO) interface.
@author: mohacsi_i
"""
from time import sleep
import numpy as np
from ophyd import Component, EpicsSignal, EpicsSignalRO, Kind
@@ -12,6 +19,8 @@ logger = bec_logger.logger
class AerotechPsoDistanceMixin(CustomDeviceMixin):
"""Mixin class for self-configuration and staging
"""
# parent : aa1Tasks
def on_stage(self) -> None:
"""Configuration and staging
@@ -21,14 +30,10 @@ class AerotechPsoDistanceMixin(CustomDeviceMixin):
when not in use. I.e. this method is not expected to be called when
PSO is not needed or when it'd conflict with other devices.
"""
# Fish out configuration from scaninfo (does not need to be full configuration)
scanparam = self.parent.scaninfo.scan_msg.info
alias = self.parent.parent.name if self.parent.parent is not None else self.parent.name
logger.warning(f"[{alias}] Scan parameters:\n{scanparam}")
d = {}
if "kwargs" in scanparam:
scanargs = scanparam["kwargs"]
if "kwargs" in self.parent.scaninfo.scan_msg.info:
scanargs = self.parent.scaninfo.scan_msg.info["kwargs"]
if "pso_distance" in scanargs:
d["pso_distance"] = scanargs["pso_distance"]
if "pso_wavemode" in scanargs:
@@ -237,7 +242,7 @@ class aa1AxisPsoDistance(aa1AxisPsoBase):
"""Simplified configuration interface to access the most common
functionality for distance mode PSO.
:param pso_distance: The trigger distance or the array of distances between subsequent points.
:param pso_distance: Distance or array of distances between subsequent trigger points.
:param pso_wavemode: Waveform mode configuration, usually pulsed/toggled (default: pulsed).
:param pso_t_pulse : trigger high duration in pulsed mode (default: 100 us)
:param pso_w_pulse : trigger hold-off time in pulsed mode (default: 200 us)
@@ -276,7 +281,7 @@ class aa1AxisPsoDistance(aa1AxisPsoBase):
new = self.read_configuration()
logger.info(f"[{self.name}] PSO configured to {pso_wavemode} mode")
return (old, new)
def bluestage(self) -> None:
"""Bluesky style stage"""
# Stage the PSO distance module and zero counter
@@ -288,25 +293,3 @@ class aa1AxisPsoDistance(aa1AxisPsoBase):
if self.dstDistanceVal.get() > 0:
self.dstEventsEna.set("On").wait()
self.dstCounterEna.set("On").wait()
# # ########################################################################
# # Bluesky flyer interface
# def prepare(self, distance=None) -> DeviceStatus:
# """ Arm trigger for a synchronous or asynchronous acquisition"""
# if distance is not None:
# # Write a new array
# if isinstance(distance, (float, int)):
# self.dstDistance.set(distance).wait()
# elif isinstance(distance, (np.ndarray, list, tuple)):
# self.dstDistanceArr.set(distance).wait()
# else:
# # Rearm the already configured array
# if isinstance(self._distance_value, (np.ndarray, list, tuple)):
# self.dstArrayRearm.set(1).wait()
# # Start monitoring the counters
# self.dstEventsEna.set("On").wait()
# self.dstCounterEna.set("On").wait()
# status = DeviceStatus(self)
# status.set_finished()
# return status

View File

@@ -1,3 +1,10 @@
# -*- coding: utf-8 -*-
"""
Ophyd device for the Aerotech Automation1 IOC's controller's task management
interface.
@author: mohacsi_i
"""
from time import sleep
from ophyd import Component, EpicsSignal, EpicsSignalRO, Kind
from ophyd.status import DeviceStatus, SubscriptionStatus
@@ -12,6 +19,8 @@ logger = bec_logger.logger
class AerotechTasksMixin(CustomDeviceMixin):
"""Mixin class for self-configuration and staging
"""
# parent : aa1Tasks
def on_stage(self) -> None:
"""Configuration and staging
@@ -27,20 +36,16 @@ class AerotechTasksMixin(CustomDeviceMixin):
# logger.warning(self.parent.scaninfo.scan_msg.info['kwargs'].keys())
# Fish out our configuration from scaninfo (via explicit or generic addressing)
scanparam = self.parent.scaninfo.scan_msg.info
alias = self.parent.parent.name if self.parent.parent is not None else self.parent.name
# logger.warning(f"[{alias}] Scan parameters:\n{scanparam}")
d = {}
if "kwargs" in scanparam:
scanargs = scanparam["kwargs"]
if "kwargs" in self.parent.scaninfo.scan_msg.info:
scanargs = self.parent.scaninfo.scan_msg.info["kwargs"]
if self.parent.scaninfo.scan_type in ("script", "scripted"):
# NOTE: Scans don't have to fully configure the device
if "script_text" in scanargs:
if "script_text" in scanargs and scanargs["script_text"] is not None:
d["script_text"] = scanargs["script_text"]
if "script_file" in scanargs:
if "script_file" in scanargs and scanargs["script_file"] is not None:
d["script_file"] = scanargs["script_file"]
if "script_task" in scanargs:
if "script_task" in scanargs and scanargs["script_task"] is not None:
d["script_task"] = scanargs["script_task"]
# Perform bluesky-style configuration
@@ -114,7 +119,7 @@ class aa1Tasks(PSIDeviceBase):
# Common operations
old = self.read_configuration()
self.switch.set("Reset").wait()
# Check if
# Check what we got
if "script_task" in d:
if d['script_task'] < 3 or d['script_task'] > 21:
raise RuntimeError(f"Invalid task index: {d['script_task']}")
@@ -136,7 +141,7 @@ class aa1Tasks(PSIDeviceBase):
def bluestage(self) -> None:
"""Bluesky style stage"""
if self.taskIndex.get() in (0, 1, 2):
logger.error(f"[{self.name}] Woah, launching AeroScript on a system task. Daring today are we?")
logger.error(f"[{self.name}] Launching AeroScript on system task. Daring today are we?")
# Launch and check success
status = self.switch.set("Run", settle_time=0.2)
status.wait()
@@ -151,13 +156,10 @@ class aa1Tasks(PSIDeviceBase):
timestamp_ = 0
task_idx = int(self.taskIndex.get())
def not_running(*args, old_value, value, timestamp, **kwargs):
def not_running(*args, value, timestamp, **kwargs):
nonlocal timestamp_
result = False if value[task_idx] in ["Running", 4] else True
# FIXME: BEC will swallow this exception
# error = bool(value[task_idx] in ["Error", 8])
result = value[task_idx] not in ["Running", 4]
timestamp_ = timestamp
# print(result)
return result
# Subscribe and wait for update

View File

@@ -2,5 +2,3 @@ from .AerotechTasks import aa1Tasks
from .AerotechPso import aa1AxisPsoDistance
from .AerotechDriveDataCollection import aa1AxisDriveDataCollection
from .AerotechAutomation1 import aa1Controller, aa1GlobalVariables, aa1GlobalVariableBindings, aa1AxisIo

View File

@@ -34,26 +34,33 @@ class StdDaqMixin(CustomDeviceMixin):
NOTE: Tomcat might use multiple cameras with their own separate DAQ instances.
"""
# Fish out our configuration from scaninfo (via explicit or generic addressing)
scanparam = self.parent.scaninfo.scan_msg.info
alias = self.parent.parent.name if self.parent.parent is not None else self.parent.name
# logger.warning(f"[{alias}] Scan parameters:\n{scanparam}")
# NOTE: Scans don't have to fully configure the device
d = {}
if 'kwargs' in scanparam:
scanargs = scanparam['kwargs']
if 'kwargs' in self.parent.scaninfo.scan_msg.info:
scanargs = self.parent.scaninfo.scan_msg.info['kwargs']
if 'image_width' in scanargs and scanargs['image_width'] != None:
d['image_width'] = scanargs['image_width']
if 'image_height' in scanargs and scanargs['image_height'] != None:
d['image_height'] = scanargs['image_height']
# NOTE: Scans don't have to fully configure the device
points_total = 1
if 'steps' in scanargs and scanargs['steps'] != None:
points_total *= scanargs['steps']
if 'exp_burst' in scanargs and scanargs['exp_burst'] != None:
points_total *= scanargs['exp_burst']
if 'repeats' in scanargs and scanargs['repeats']!= None:
points_total *= scanargs['repeats']
if points_total != 1:
d['num_points_total'] = points_total
if "daq_num_points" in scanargs:
d["num_points_total"] = scanargs["daq_num_points"]
else:
# Try to figure out number of points
num_points = 1
points_valid = False
if "steps" in scanargs and scanargs['steps'] is not None:
num_points *= scanargs["steps"]
points_valid = True
elif "exp_burst" in scanargs and scanargs['exp_burst'] is not None:
num_points *= scanargs["exp_burst"]
points_valid = True
elif "repeats" in scanargs and scanargs['repeats'] is not None:
num_points *= scanargs["repeats"]
points_valid = True
if points_valid:
d["num_points_total"] = num_points
# Perform bluesky-style configuration
if len(d) > 0: