diff --git a/tomcat_bec/device_configs/microxas_test_bed.yaml b/tomcat_bec/device_configs/microxas_test_bed.yaml index a8ba010..9039ab5 100644 --- a/tomcat_bec/device_configs/microxas_test_bed.yaml +++ b/tomcat_bec/device_configs/microxas_test_bed.yaml @@ -55,13 +55,19 @@ femto_mean_curr: # enabled: false # readoutPriority: monitored -# es1_tasks: -# description: 'AA1 task management interface' -# deviceClass: tomcat_bec.devices.aa1Tasks -# deviceConfig: {prefix: 'X02DA-ES1-SMP1:TASK:'} -# onFailure: buffer -# enabled: true -# readoutPriority: monitored +es1_tasks: + description: 'Automation1 task management interface' + deviceClass: tomcat_bec.devices.aa1Tasks + deviceConfig: + prefix: 'X02DA-ES1-SMP1:TASK:' + deviceTags: + - es1 + enabled: true + onFailure: buffer + readOnly: false + readoutPriority: monitored + softwareTrigger: false + # es1_psod: # description: 'AA1 PSO output interface' @@ -71,13 +77,19 @@ femto_mean_curr: # enabled: true # readoutPriority: monitored -# es1_ddaq: -# description: 'AA1 drive data collection interface' -# deviceClass: tomcat_bec.devices.aa1AxisDriveDataCollection -# deviceConfig: {prefix: 'X02DA-ES1-SMP1:ROTY:DDC:'} -# onFailure: buffer -# enabled: true -# readoutPriority: monitored +es1_ddaq: + description: 'Automation1 position recording interface' + deviceClass: tomcat_bec.devices.aa1AxisDriveDataCollection + deviceConfig: + prefix: 'X02DA-ES1-SMP1:ROTY:DDC:' + deviceTags: + - es1 + enabled: true + onFailure: buffer + readOnly: false + readoutPriority: monitored + softwareTrigger: false + #camera: # description: Grashopper Camera diff --git a/tomcat_bec/devices/aerotech/AerotechAutomation1.py b/tomcat_bec/devices/aerotech/AerotechAutomation1.py index 064a6e1..4ed55ab 100644 --- a/tomcat_bec/devices/aerotech/AerotechAutomation1.py +++ b/tomcat_bec/devices/aerotech/AerotechAutomation1.py @@ -6,12 +6,14 @@ import numpy as np from ophyd import Component, Device, EpicsMotor, EpicsSignal, EpicsSignalRO, Kind from ophyd.status import DeviceStatus, Status, StatusBase, SubscriptionStatus -from AerotechAutomation1Enums import ( - DataCollectionFrequency, - DataCollectionMode, - DriveDataCaptureInput, - DriveDataCaptureTrigger, -) +try: + from AerotechAutomation1Enums import ( + DriveDataCaptureInput, + DriveDataCaptureTrigger, + ) +except ModuleNotFoundError: + from tomcat_bec.devices.aerotech.AerotechAutomation1Enums import DriveDataCaptureInput + from tomcat_bec.devices.aerotech.AerotechAutomation1Enums import DriveDataCaptureTrigger try: from bec_lib import bec_logger diff --git a/tomcat_bec/scans/tomcat_scanbase.py b/tomcat_bec/scans/tomcat_scanbase.py new file mode 100644 index 0000000..f2c6981 --- /dev/null +++ b/tomcat_bec/scans/tomcat_scanbase.py @@ -0,0 +1,238 @@ +# -*- coding: utf-8 -*- +""" Tomcat scan base class examples + +A collection of example scan base classes using Automation1 rotation stage, +GigaFrost camera and the StandardDAQ pipeline. + +Created on Mon Sep 16 16:45:11 2024 + +@author: mohacsi_i +""" +import jinja2 +import time + +from bec_lib import bec_logger +from scan_server.scans import AsyncFlyScanBase, ScanArgType, ScanBase + +logger = bec_logger.logger + + + +class TemplatedScanBase(AsyncFlyScanBase): + """ Base class for templated scans + + Low-level base class for templated AeroScript scans at the Tomcat beamlines. + It sets the order of operations between aerotech, gigafrost and the standard + DAQ. But as a base class, it leaves ample freedom for individual + hardware configurations and scan implementations. + + Example + ------- + >>> scans.aeroscript_scan_base(filename="AerotechSnapAndStepTemplate.ascript", subs={'startpos': 42, 'stepsize': 0.1, 'numsteps': 1800, 'exptime': 0.1}) + + Parameters + ---------- + filename: str + Filename of the Aeroscript template file. This or filetext ismandatory. + scripttext: str + Raw AeroScript file text of the program. This or filename is mandatory. + subs: dict + Substitutions to the AeroScript template file. + taskindex: int + Task index tor the Aeroscript program execution. (default = 4) + camera: str + Device name of the used camera. (default = gfclient) + camcfg: str + Camera configuration. (default = {}) + preview: str + Device name of the live stream preview. (default = daq_stream0) + daqname: str + Device name for position recording. (default = None) + daqmode: str + Operation mode for the position recording. (default = collect) + """ + scan_name = "aeroscript_scan_base" + scan_report_hint = "table" + required_kwargs = ["filename", "subs"] + arg_input = {} + arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None} + + def __init__(self, *args, parameter: dict = None, **kwargs): + """Executes an AeroScript template as a flyer + """ + super().__init__(parameter=parameter, **kwargs) + self.axis = [] + self.scan_motors = [] + self.num_pos = 0 + + self.filename = self.caller_kwargs.get("filename", None) + self.scripttext = self.caller_kwargs.get("scripttext", None) + self.subs = self.caller_kwargs.get("subs", {}) + self.taskindex = self.caller_kwargs.get("taskindex", 4) + self.camera = self.caller_kwargs.get("camera", 'gfclient') + self.camcfg = self.caller_kwargs.get("camcfg", {}) + self.preview = self.caller_kwargs.get("preview", 'daq_stream0') + self.daqname = self.caller_kwargs.get("daqname", None) + self.daqcfg = self.caller_kwargs.get("daqcfg", {}) + self.daqmode = self.caller_kwargs.get("daqmode", 'collect') + + if self.filename is None and self.filetext is None: + raise RuntimeError("Must provide either filename or text to scan") + + + def prepare_positions(self): + """ Prepare action: render AeroScript file""" + print("TOMCAT Loading Aeroscript template") + # Load the test file + if self.filename is not None: + with open(self.filename) as f: + templatetext = f.read() + + # Substitute jinja template + + tm = jinja2.Template(templatetext) + self.scripttext = tm.render(scan=self.subs) + + yield from self.stubs.prepare_positions() + + + def stage(self): + """ Configure and stage all devices""" + + print("TOMCAT Staging sequeence scan (via Jinjad AeroScript)") + + # Configure the Aerotech by copying text to controller file and compiling it + taskcfg = {"text": self.scripttext, "filename": "bec.ascript", "taskIndex": self.taskindex} + yield from self.stubs.send_rpc_and_wait("es1_tasks", "configure", taskcfg) + + # Configure the camera (usually together wit the DAQ) + yield from self.stubs.send_rpc_and_wait(self.camera, "configure", self.camcfg) + + # Configure the camera (usually together wit the DAQ) + if self.daqname is not None: + yield from self.stubs.send_rpc_and_wait(self.daqname, "configure", self.daqcfg) + + # ################################################################################### + # Staging + yield from self.stubs.send_rpc_and_wait("es1_tasks", "stage") + if self.camera is not None: + yield from self.stubs.send_rpc_and_wait(self.camera, "stage") + if self.daqname: + yield from self.stubs.send_rpc_and_wait(self.daqname, "stage") + if self.preview is not None: + yield from self.stubs.send_rpc_and_wait(self.preview, "stage") + # For God, NO! + # yield from super().stage() + + + def scan_core(self): + """ The actual scan routine""" + print("TOMCAT Sequeence scan (via Jinjad AeroScript)") + t_start = time.time() + + # Kickoff + st = yield from self.stubs.send_rpc_and_wait("es1_tasks", "kickoff") + st.wait() + time.sleep(0.5) + + # Complete + yield from self.stubs.complete(device="es1_tasks") + + t_end = time.time() + t_elapsed = t_end - t_start + print(f"Elapsed scan time: {t_elapsed}") + time.sleep(0.5) + + # Collect + if self.daqmode=="collect": + st = yield from self.stubs.send_rpc_and_wait(self.daqname, "collect") + st.wait() + + + def cleanup(self): + """Set scan progress to 1 to finish the scan""" + self.num_pos = self.pointID + return super().cleanup() + + + + +class SnapAndStepScanBase(TemplatedScanBase): + """ Snap'n Step scan base class + + Example base class for AeroScript-based high speed step scans. + """ + scan_name = "snapnstep" + scan_report_hint = "table" + required_kwargs = ["range","steps"] + arg_input = {} + arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None} + + def __init__(self, *args, parameter: dict = None, **kwargs): + """Example step scan + + Perform a simple step scan with a motor while software triggering the + gigafrost burst sequence at each point and recording it to the StdDAQ. + Actually only the configuration is gigafrost specific, everything else + is just using standard Bluesky event model. + + Example + ------- + >>> scans.snapnstep(range=(0, 180), steps=1800, exp_time=20, exp_burst=5) + + Parameters + ---------- + range : (float, float) + Scan range of the axis. + steps : int, optional + Number of scan steps to cover the range. (default = 10) + exp_time : float, optional [0.01 ... 40] + Exposure time for each frame in [ms]. The IOC fixes the exposure + period to be 2x this long so it doesnt matter. (default = 20) + exp_burst : float, optional + Number of images to be taken for each scan point. (default=1) + """ + # Auto-setup configuration parameters from input + self.scan_range = parameter.get("range") + self.scan_stepnum = int(parameter.get("steps")) + self.scan_exptime = float(parameter.get("exp_time", 20)) + self.scan_expburst = float(parameter.get("exp_burst", 1)) + self.scan_roix = int(parameter.get("roix", 2016)) + self.scan_roiy = int(parameter.get("roiy", 2016)) + + # Synthetic values + self.scan_startpos = self.scan_range[0] + self.scan_stepsize = (self.scan_range[1]-self.scan_range[0])/self.scan_stepnum + self.scan_ntotal = self.scan_stepnum * self.scan_expburst + + parameter['filename'] = "AerotechSnapAndStepTemplate.ascript" + parameter['subs'] = { + 'startpos': self.scan_startpos, + 'stepsize': self.scan_stepsize, + 'numsteps': self.scan_stepnum, + 'exptime': self.scan_exptime*self.scan_expburst + } + # Aerotech DDC settings: Internal event trigger: PsoEvent = 1 + parameter['daqname'] = "es1_ddaq" + parameter['daqcfg'] = {'ntotal': self.scan_ntotal, 'trigger': 1} + # Gigafrost config + parameter['camcfg'] = { + "ntotal": self.scan_ntotal, + "nimages": self.scan_expburst, + "exposure": self.scan_exptime, + "period": 2*self.scan_exptime, + "pixel_width": self.scan_roix, + "pixel_height": self.scan_roiy + } + + # Parameter validation before scan + if self.scan_stepnum <=0: + raise RuntimeError(f"Requires at least one scan step, got {self.scan_stepnum}") + + # Call super() + super().__init__(parameter=parameter, **kwargs) + + + def scan_core(self): + """ Don't do the scan during testing""" + raise RuntimeError("You shall NOT PASSS!!!") \ No newline at end of file