Same format for Aeroscript scan

This commit is contained in:
gac-x05la
2024-10-10 17:59:45 +02:00
parent f14205e490
commit daa68c288a

View File

@@ -56,23 +56,29 @@ class TemplatedScanBase(AsyncFlyScanBase):
scan_report_hint = "table"
required_kwargs = ["filename", "subs"]
def __init__(self, *args, parameter: dict = None, **kwargs):
def __init__(self, *, filename=None, filetext=None, filesubs={},
controller="es1_tasks", taskindex=4,
camera="gfclient", camcfg=None,
drvdaq="es1_ddaq", daqcfg=None, daqmode="collect",
preview="daq_stream1",
**kwargs):
"""Executes an AeroScript template as a flyer
"""
super().__init__(parameter=parameter, **kwargs)
super().__init__(**kwargs)
self.axis = []
self.num_pos = 0
self.filename = self.caller_kwargs.get("filename", None)
self.scripttext = self.caller_kwargs.get("scripttext", None)
self.subs = self.caller_kwargs.get("subs", {})
self.taskindex = self.caller_kwargs.get("taskindex", 4)
self.camera = self.caller_kwargs.get("camera", 'gfclient')
self.camcfg = self.caller_kwargs.get("camcfg", {})
self.preview = self.caller_kwargs.get("preview", 'daq_stream0')
self.daqname = self.caller_kwargs.get("daqname", None)
self.daqcfg = self.caller_kwargs.get("daqcfg", {})
self.daqmode = self.caller_kwargs.get("daqmode", 'collect')
self.filename = filename
self.filetext = filetext
self.filesubs = filesubs
self.controller = controller
self.taskindex = taskindex
self.camera = camera
self.camcfg = camcfg
self.preview = preview
self.daqname = drvdaq
self.daqcfg = daqcfg
self.daqmode = daqmode
if self.filename is None and self.filetext is None:
raise RuntimeError("Must provide either filename or text to scan")
@@ -80,18 +86,16 @@ class TemplatedScanBase(AsyncFlyScanBase):
def prepare_positions(self):
""" Prepare action: render AeroScript file"""
print("TOMCAT Loading Aeroscript template")
# Load the test file
if self.filename is not None:
filename = os.path.join(os.path.dirname(__file__), '../devices/aerotech/' + self.filename)
logger.info(f"Attempring to load file {filename}")
logger.info(f"Attempting to load file {filename}")
with open(filename) as f:
templatetext = f.read()
# Substitute jinja template
tm = jinja2.Template(templatetext)
self.scripttext = tm.render(scan=self.subs)
self.filetext = tm.render(scan=self.filesubs)
yield from super().prepare_positions()
@@ -102,13 +106,13 @@ class TemplatedScanBase(AsyncFlyScanBase):
print("TOMCAT Staging scripted scan (via Jinjad AeroScript)")
# Configure the Aerotech by copying text to controller file and compiling it
taskcfg = {"text": self.scripttext, "filename": "bec.ascript", "taskIndex": self.taskindex}
yield from self.stubs.send_rpc_and_wait("es1_tasks", "configure", taskcfg)
# Configure the camera (usually together wit the DAQ)
yield from self.stubs.send_rpc_and_wait(self.camera, "configure", self.camcfg)
taskcfg = {"text": self.filetext, "filename": "bec.ascript", "taskIndex": self.taskindex}
yield from self.stubs.send_rpc_and_wait(self.controller, "configure", taskcfg)
# Configure the camera (usually together wit the DAQ)
if self.camera is not None:
yield from self.stubs.send_rpc_and_wait(self.camera, "configure", self.camcfg)
# Configure the drive data collection
if self.daqname is not None:
yield from self.stubs.send_rpc_and_wait(self.daqname, "configure", self.daqcfg)
@@ -117,21 +121,23 @@ class TemplatedScanBase(AsyncFlyScanBase):
yield from self.stubs.send_rpc_and_wait("es1_tasks", "stage")
if self.camera is not None:
yield from self.stubs.send_rpc_and_wait(self.camera, "stage")
if self.daqname:
if self.daqname is not None:
yield from self.stubs.send_rpc_and_wait(self.daqname, "stage")
if self.preview is not None:
yield from self.stubs.send_rpc_and_wait(self.preview, "stage")
# For God, NO!
# yield from super().stage()
# TODO : This should stage the entire beamline
yield from super().stage()
def scan_core(self):
""" The actual scan routine"""
print("TOMCAT scripted scan (via Jinjad AeroScript)")
print("TOMCAT Running scripted scan (via Jinjad AeroScript)")
t_start = time.time()
# Kickoff
st = yield from self.stubs.send_rpc_and_wait("es1_tasks", "kickoff")
yield from self.stubs.send_rpc_and_wait(self.controller, "kickoff")
# yield from self.stubs.send_rpc_and_wait(self.daqname, "kickoff")
time.sleep(0.5)
# FIXME: Temporary triggering
@@ -140,9 +146,9 @@ class TemplatedScanBase(AsyncFlyScanBase):
# Complete
# FIXME: this will swallow errors
# yield from self.stubs.complete(device="es1_tasks")
st = yield from self.stubs.send_rpc_and_wait("es1_tasks", "complete")
st = yield from self.stubs.send_rpc_and_wait(self.controller, "complete")
st.wait()
task_states = yield from self.stubs.send_rpc_and_wait("es1_tasks", "taskStates.get")
task_states = yield from self.stubs.send_rpc_and_wait(self.controller, "taskStates.get")
if task_states[self.taskindex] == 8:
raise RuntimeError(f"Task {self.taskindex} finished in ERROR state")
@@ -152,7 +158,7 @@ class TemplatedScanBase(AsyncFlyScanBase):
time.sleep(0.5)
# Collect
if self.daqmode=="collect":
if self.daqname is not None and self.daqmode=="collect":
positions = yield from self.stubs.send_rpc_and_wait(self.daqname, "collect")
logger.info(f"Finished scan with collected positions: {positions}")
@@ -269,18 +275,6 @@ class GigaStepScanBase(ScanBase):
@@ -297,27 +291,21 @@ class SnapAndStepScanBase(TemplatedScanBase):
Example
-------
>>> scans.snapnstep(range=(0, 180), steps=1800, exp_time=20, exp_burst=5)
Parameters
----------
range : (float, float)
Scan range of the axis.
steps : int, optional
Number of scan steps to cover the range. (default = 10)
exp_time : float, optional [0.01 ... 40]
Exposure time for each frame in [ms]. The IOC fixes the exposure
period to be 2x this long so it doesnt matter. (default = 20)
exp_burst : float, optional
Number of images to be taken for each scan point. (default=1)
>>> scans.snapnstep(-20 160, steps=1800, exp_time=0.005, burst_at_each_point=5)
"""
scan_name = "snapnstep"
scan_report_hint = "table"
required_kwargs = ["range","steps"]
arg_input = {}
arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None}
def __init__(self, *args, parameter: dict = None, **kwargs):
required_kwargs = ["scan_start", "scan_end","steps"]
gui_config = {
"Movement parameters": ["steps"],
"Acquisition parameters" : ["exp_time", "burst_at_each_point", "roix", "roiy"]
}
def __init__(self, scan_start, scan_end, steps,
exp_time=0.005, burst_at_each_point=1,
roix=2016, roiy=2016, **kwargs):
"""Example step scan
Perform a simple step scan with a motor while software triggering the
@@ -327,46 +315,50 @@ class SnapAndStepScanBase(TemplatedScanBase):
"""
# Auto-setup configuration parameters from input
logger.info(parameter)
self.scan_range = parameter['kwargs'].get("range")
self.scan_stepnum = int(parameter['kwargs'].get("steps"))
self.scan_exptime = float(parameter['kwargs'].get("exp_time", 20))
self.scan_expburst = float(parameter['kwargs'].get("exp_burst", 1))
self.scan_roix = int(parameter['kwargs'].get("roix", 2016))
self.scan_roiy = int(parameter['kwargs'].get("roiy", 2016))
self.scan_start = scan_start
self.scan_end = scan_end
self.scan_steps = steps
self.exp_time = exp_time
self.exp_burst = burst_at_each_point
# Synthetic values
self.scan_startpos = self.scan_range[0]
self.scan_stepsize = (self.scan_range[1]-self.scan_range[0])/self.scan_stepnum
self.scan_ntotal = self.scan_stepnum * self.scan_expburst
self.scan_stepsize = (scan_end-scan_start)/steps
self.scan_ntotal = steps * burst_at_each_point
parameter['kwargs']['filename'] = "AerotechSnapAndStepTemplate.ascript"
parameter['kwargs']['subs'] = {
'startpos': self.scan_startpos,
filename = "AerotechSnapAndStepTemplate.ascript"
filesubs = {
'startpos': self.scan_start,
'stepsize': self.scan_stepsize,
'numsteps': self.scan_stepnum,
'exptime': self.scan_exptime*self.scan_expburst/1000
'numsteps': self.scan_steps,
'exptime': self.exp_time*self.exp_burst
}
# Aerotech DDC settings: Internal event trigger: PsoEvent = 1
parameter['kwargs']['daqname'] = "es1_ddaq"
parameter['kwargs']['daqcfg'] = {'ntotal': self.scan_ntotal, 'trigger': 1}
daqname = "es1_ddaq"
daqcfg = {'ntotal': self.scan_ntotal, 'trigger': 1}
# Gigafrost config
parameter['kwargs']['camcfg'] = {
camname = "gfclient"
camcfg = {
"ntotal": self.scan_ntotal,
"nimages": self.scan_expburst,
"exposure": self.scan_exptime,
"period": 2*self.scan_exptime,
"pixel_width": self.scan_roix,
"pixel_height": self.scan_roiy
"nimages": burst_at_each_point,
"exposure": 1000 * exp_time,
"period": 2000*exp_time,
"pixel_width": roix,
"pixel_height": roiy
}
# Parameter validation before scan
if self.scan_stepnum <=0:
raise RuntimeError(f"Requires at least one scan step, got {self.scan_stepnum}")
if self.scan_steps <=0:
raise RuntimeError(f"Requires at least one scan step, got {self.scan_steps}")
# Call super()
super().__init__(parameter=parameter, **kwargs)
super().__init__(
filename=filename, filesubs=filesubs,
controller="es1_tasks", taskindex=4,
camera=camname, camcfg=camcfg,
drvdaq=daqname, daqcfg=daqcfg, daqmode="collect",
preview="daq_stream1",
**kwargs)
# def scan_core(self):
# """ Don't do the scan during testing"""