Re-added automation1 to database

This commit is contained in:
gac-x05la
2024-09-16 17:06:27 +02:00
parent 4463857b6d
commit dfdc458bbc
3 changed files with 272 additions and 20 deletions

View File

@@ -55,13 +55,19 @@ femto_mean_curr:
# enabled: false
# readoutPriority: monitored
# es1_tasks:
# description: 'AA1 task management interface'
# deviceClass: tomcat_bec.devices.aa1Tasks
# deviceConfig: {prefix: 'X02DA-ES1-SMP1:TASK:'}
# onFailure: buffer
# enabled: true
# readoutPriority: monitored
es1_tasks:
description: 'Automation1 task management interface'
deviceClass: tomcat_bec.devices.aa1Tasks
deviceConfig:
prefix: 'X02DA-ES1-SMP1:TASK:'
deviceTags:
- es1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: monitored
softwareTrigger: false
# es1_psod:
# description: 'AA1 PSO output interface'
@@ -71,13 +77,19 @@ femto_mean_curr:
# enabled: true
# readoutPriority: monitored
# es1_ddaq:
# description: 'AA1 drive data collection interface'
# deviceClass: tomcat_bec.devices.aa1AxisDriveDataCollection
# deviceConfig: {prefix: 'X02DA-ES1-SMP1:ROTY:DDC:'}
# onFailure: buffer
# enabled: true
# readoutPriority: monitored
es1_ddaq:
description: 'Automation1 position recording interface'
deviceClass: tomcat_bec.devices.aa1AxisDriveDataCollection
deviceConfig:
prefix: 'X02DA-ES1-SMP1:ROTY:DDC:'
deviceTags:
- es1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: monitored
softwareTrigger: false
#camera:
# description: Grashopper Camera

View File

@@ -6,12 +6,14 @@ import numpy as np
from ophyd import Component, Device, EpicsMotor, EpicsSignal, EpicsSignalRO, Kind
from ophyd.status import DeviceStatus, Status, StatusBase, SubscriptionStatus
from AerotechAutomation1Enums import (
DataCollectionFrequency,
DataCollectionMode,
DriveDataCaptureInput,
DriveDataCaptureTrigger,
)
try:
from AerotechAutomation1Enums import (
DriveDataCaptureInput,
DriveDataCaptureTrigger,
)
except ModuleNotFoundError:
from tomcat_bec.devices.aerotech.AerotechAutomation1Enums import DriveDataCaptureInput
from tomcat_bec.devices.aerotech.AerotechAutomation1Enums import DriveDataCaptureTrigger
try:
from bec_lib import bec_logger

View File

@@ -0,0 +1,238 @@
# -*- coding: utf-8 -*-
""" Tomcat scan base class examples
A collection of example scan base classes using Automation1 rotation stage,
GigaFrost camera and the StandardDAQ pipeline.
Created on Mon Sep 16 16:45:11 2024
@author: mohacsi_i
"""
import jinja2
import time
from bec_lib import bec_logger
from scan_server.scans import AsyncFlyScanBase, ScanArgType, ScanBase
logger = bec_logger.logger
class TemplatedScanBase(AsyncFlyScanBase):
""" Base class for templated scans
Low-level base class for templated AeroScript scans at the Tomcat beamlines.
It sets the order of operations between aerotech, gigafrost and the standard
DAQ. But as a base class, it leaves ample freedom for individual
hardware configurations and scan implementations.
Example
-------
>>> scans.aeroscript_scan_base(filename="AerotechSnapAndStepTemplate.ascript", subs={'startpos': 42, 'stepsize': 0.1, 'numsteps': 1800, 'exptime': 0.1})
Parameters
----------
filename: str
Filename of the Aeroscript template file. This or filetext ismandatory.
scripttext: str
Raw AeroScript file text of the program. This or filename is mandatory.
subs: dict
Substitutions to the AeroScript template file.
taskindex: int
Task index tor the Aeroscript program execution. (default = 4)
camera: str
Device name of the used camera. (default = gfclient)
camcfg: str
Camera configuration. (default = {})
preview: str
Device name of the live stream preview. (default = daq_stream0)
daqname: str
Device name for position recording. (default = None)
daqmode: str
Operation mode for the position recording. (default = collect)
"""
scan_name = "aeroscript_scan_base"
scan_report_hint = "table"
required_kwargs = ["filename", "subs"]
arg_input = {}
arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None}
def __init__(self, *args, parameter: dict = None, **kwargs):
"""Executes an AeroScript template as a flyer
"""
super().__init__(parameter=parameter, **kwargs)
self.axis = []
self.scan_motors = []
self.num_pos = 0
self.filename = self.caller_kwargs.get("filename", None)
self.scripttext = self.caller_kwargs.get("scripttext", None)
self.subs = self.caller_kwargs.get("subs", {})
self.taskindex = self.caller_kwargs.get("taskindex", 4)
self.camera = self.caller_kwargs.get("camera", 'gfclient')
self.camcfg = self.caller_kwargs.get("camcfg", {})
self.preview = self.caller_kwargs.get("preview", 'daq_stream0')
self.daqname = self.caller_kwargs.get("daqname", None)
self.daqcfg = self.caller_kwargs.get("daqcfg", {})
self.daqmode = self.caller_kwargs.get("daqmode", 'collect')
if self.filename is None and self.filetext is None:
raise RuntimeError("Must provide either filename or text to scan")
def prepare_positions(self):
""" Prepare action: render AeroScript file"""
print("TOMCAT Loading Aeroscript template")
# Load the test file
if self.filename is not None:
with open(self.filename) as f:
templatetext = f.read()
# Substitute jinja template
tm = jinja2.Template(templatetext)
self.scripttext = tm.render(scan=self.subs)
yield from self.stubs.prepare_positions()
def stage(self):
""" Configure and stage all devices"""
print("TOMCAT Staging sequeence scan (via Jinjad AeroScript)")
# Configure the Aerotech by copying text to controller file and compiling it
taskcfg = {"text": self.scripttext, "filename": "bec.ascript", "taskIndex": self.taskindex}
yield from self.stubs.send_rpc_and_wait("es1_tasks", "configure", taskcfg)
# Configure the camera (usually together wit the DAQ)
yield from self.stubs.send_rpc_and_wait(self.camera, "configure", self.camcfg)
# Configure the camera (usually together wit the DAQ)
if self.daqname is not None:
yield from self.stubs.send_rpc_and_wait(self.daqname, "configure", self.daqcfg)
# ###################################################################################
# Staging
yield from self.stubs.send_rpc_and_wait("es1_tasks", "stage")
if self.camera is not None:
yield from self.stubs.send_rpc_and_wait(self.camera, "stage")
if self.daqname:
yield from self.stubs.send_rpc_and_wait(self.daqname, "stage")
if self.preview is not None:
yield from self.stubs.send_rpc_and_wait(self.preview, "stage")
# For God, NO!
# yield from super().stage()
def scan_core(self):
""" The actual scan routine"""
print("TOMCAT Sequeence scan (via Jinjad AeroScript)")
t_start = time.time()
# Kickoff
st = yield from self.stubs.send_rpc_and_wait("es1_tasks", "kickoff")
st.wait()
time.sleep(0.5)
# Complete
yield from self.stubs.complete(device="es1_tasks")
t_end = time.time()
t_elapsed = t_end - t_start
print(f"Elapsed scan time: {t_elapsed}")
time.sleep(0.5)
# Collect
if self.daqmode=="collect":
st = yield from self.stubs.send_rpc_and_wait(self.daqname, "collect")
st.wait()
def cleanup(self):
"""Set scan progress to 1 to finish the scan"""
self.num_pos = self.pointID
return super().cleanup()
class SnapAndStepScanBase(TemplatedScanBase):
""" Snap'n Step scan base class
Example base class for AeroScript-based high speed step scans.
"""
scan_name = "snapnstep"
scan_report_hint = "table"
required_kwargs = ["range","steps"]
arg_input = {}
arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None}
def __init__(self, *args, parameter: dict = None, **kwargs):
"""Example step scan
Perform a simple step scan with a motor while software triggering the
gigafrost burst sequence at each point and recording it to the StdDAQ.
Actually only the configuration is gigafrost specific, everything else
is just using standard Bluesky event model.
Example
-------
>>> scans.snapnstep(range=(0, 180), steps=1800, exp_time=20, exp_burst=5)
Parameters
----------
range : (float, float)
Scan range of the axis.
steps : int, optional
Number of scan steps to cover the range. (default = 10)
exp_time : float, optional [0.01 ... 40]
Exposure time for each frame in [ms]. The IOC fixes the exposure
period to be 2x this long so it doesnt matter. (default = 20)
exp_burst : float, optional
Number of images to be taken for each scan point. (default=1)
"""
# Auto-setup configuration parameters from input
self.scan_range = parameter.get("range")
self.scan_stepnum = int(parameter.get("steps"))
self.scan_exptime = float(parameter.get("exp_time", 20))
self.scan_expburst = float(parameter.get("exp_burst", 1))
self.scan_roix = int(parameter.get("roix", 2016))
self.scan_roiy = int(parameter.get("roiy", 2016))
# Synthetic values
self.scan_startpos = self.scan_range[0]
self.scan_stepsize = (self.scan_range[1]-self.scan_range[0])/self.scan_stepnum
self.scan_ntotal = self.scan_stepnum * self.scan_expburst
parameter['filename'] = "AerotechSnapAndStepTemplate.ascript"
parameter['subs'] = {
'startpos': self.scan_startpos,
'stepsize': self.scan_stepsize,
'numsteps': self.scan_stepnum,
'exptime': self.scan_exptime*self.scan_expburst
}
# Aerotech DDC settings: Internal event trigger: PsoEvent = 1
parameter['daqname'] = "es1_ddaq"
parameter['daqcfg'] = {'ntotal': self.scan_ntotal, 'trigger': 1}
# Gigafrost config
parameter['camcfg'] = {
"ntotal": self.scan_ntotal,
"nimages": self.scan_expburst,
"exposure": self.scan_exptime,
"period": 2*self.scan_exptime,
"pixel_width": self.scan_roix,
"pixel_height": self.scan_roiy
}
# Parameter validation before scan
if self.scan_stepnum <=0:
raise RuntimeError(f"Requires at least one scan step, got {self.scan_stepnum}")
# Call super()
super().__init__(parameter=parameter, **kwargs)
def scan_core(self):
""" Don't do the scan during testing"""
raise RuntimeError("You shall NOT PASSS!!!")